You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/06/26 19:26:08 UTC
[1/4] TEZ-1218. Make TaskScheduler an Abstract class instead of an
Inteface. Contributed by Jeff Zhang.
Repository: incubator-tez
Updated Branches:
refs/heads/master af538639c -> 4dfd8341d
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 5a0856b..b0ea644 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -55,8 +55,8 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import com.google.common.base.Preconditions;
@@ -133,20 +133,20 @@ class TestTaskSchedulerHelpers {
}
@Override
- public TaskSchedulerInterface createTaskScheduler(String host, int port,
+ public TaskSchedulerService createTaskScheduler(String host, int port,
String trackingUrl, AppContext appContext) {
return new TaskSchedulerWithDrainableAppCallback(this,
containerSignatureMatcher, host, port, trackingUrl, amrmClientAsync,
appContext);
}
- public TaskSchedulerInterface getSpyTaskScheduler() {
+ public TaskSchedulerService getSpyTaskScheduler() {
return this.taskScheduler;
}
@Override
public void serviceStart() {
- TaskSchedulerInterface taskSchedulerReal = createTaskScheduler("host", 0, "",
+ TaskSchedulerService taskSchedulerReal = createTaskScheduler("host", 0, "",
appContext);
// Init the service so that reuse configuration is picked up.
((AbstractService)taskSchedulerReal).init(getConfig());
@@ -190,7 +190,7 @@ class TestTaskSchedulerHelpers {
}
}
- static class TaskSchedulerWithDrainableAppCallback extends TaskScheduler {
+ static class TaskSchedulerWithDrainableAppCallback extends YarnTaskSchedulerService {
private TaskSchedulerAppCallbackDrainable drainableAppCallback;
[4/4] git commit: TEZ-1218. Make TaskScheduler an Abstract class
instead of an Inteface. Contributed by Jeff Zhang.
Posted by ss...@apache.org.
TEZ-1218. Make TaskScheduler an Abstract class instead of an Inteface.
Contributed by Jeff Zhang.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/4dfd8341
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/4dfd8341
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/4dfd8341
Branch: refs/heads/master
Commit: 4dfd8341d8642dea418195beb5d3778d3bcf35a9
Parents: af53863
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jun 26 10:25:27 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 26 10:25:27 2014 -0700
----------------------------------------------------------------------
.../tez/dag/app/rm/LocalTaskScheduler.java | 337 ---
.../dag/app/rm/LocalTaskSchedulerService.java | 335 +++
.../apache/tez/dag/app/rm/TaskScheduler.java | 1985 ------------------
.../app/rm/TaskSchedulerAppCallbackWrapper.java | 2 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 14 +-
.../tez/dag/app/rm/TaskSchedulerInterface.java | 56 -
.../tez/dag/app/rm/TaskSchedulerService.java | 106 +
.../dag/app/rm/YarnTaskSchedulerService.java | 1951 +++++++++++++++++
.../tez/dag/app/rm/TestContainerReuse.java | 6 +-
.../tez/dag/app/rm/TestLocalTaskScheduler.java | 8 +-
.../tez/dag/app/rm/TestTaskScheduler.java | 14 +-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 12 +-
12 files changed, 2420 insertions(+), 2406 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
deleted file mode 100644
index 87bfccd..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskScheduler.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-public class LocalTaskScheduler extends AbstractService implements TaskSchedulerInterface {
-
- private static final Log LOG = LogFactory.getLog(LocalTaskScheduler.class);
-
- final TaskSchedulerAppCallback realAppClient;
- final TaskSchedulerAppCallback appClientDelegate;
- final ContainerSignatureMatcher containerSignatureMatcher;
- final PriorityBlockingQueue<TaskRequest> taskRequestQueue;
- AsyncDelegateRequestHandler taskRequestHandler;
- Thread asyncDelegateRequestThread;
- final ExecutorService appCallbackExecutor;
-
- final HashMap<Object, Container> taskAllocations;
- final String appHostName;
- final int appHostPort;
- final String appTrackingUrl;
- final AppContext appContext;
-
- public LocalTaskScheduler(TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
- int appHostPort, String appTrackingUrl, AppContext appContext) {
- super(LocalTaskScheduler.class.getName());
- this.realAppClient = appClient;
- this.appCallbackExecutor = createAppCallbackExecutorService();
- this.containerSignatureMatcher = containerSignatureMatcher;
- this.appClientDelegate = createAppCallbackDelegate(appClient);
- this.appHostName = appHostName;
- this.appHostPort = appHostPort;
- this.appTrackingUrl = appTrackingUrl;
- this.appContext = appContext;
- taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
- taskAllocations = new LinkedHashMap<Object, Container>();
- }
-
- private ExecutorService createAppCallbackExecutorService() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
- }
-
- private TaskSchedulerAppCallback createAppCallbackDelegate(
- TaskSchedulerAppCallback realAppClient) {
- return new TaskSchedulerAppCallbackWrapper(realAppClient,
- appCallbackExecutor);
- }
-
- @Override
- public Resource getAvailableResources() {
- Resource freeResources = Resource.newInstance(
- (int)Runtime.getRuntime().freeMemory()/(1024*1024),
- Runtime.getRuntime().availableProcessors());
- return freeResources;
- }
-
- @Override
- public int getClusterNodeCount() {
- return 1;
- }
-
- @Override
- public void resetMatchLocalityForAllHeldContainers() {
- }
-
- @Override
- public Resource getTotalResources() {
- Resource totalResources = Resource.newInstance(
- (int)Runtime.getRuntime().maxMemory()/(1024*1024),
- Runtime.getRuntime().availableProcessors());
- return totalResources;
- }
-
- @Override
- public void blacklistNode(NodeId nodeId) {
- }
-
- @Override
- public void unblacklistNode(NodeId nodeId) {
- }
-
- @Override
- public void allocateTask(Object task, Resource capability, String[] hosts,
- String[] racks, Priority priority, Object containerSignature,
- Object clientCookie) {
- taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
- }
-
- @Override
- public synchronized void allocateTask(Object task, Resource capability,
- ContainerId containerId, Priority priority, Object containerSignature,
- Object clientCookie) {
- // in local mode every task is already container level local
- taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
- }
-
- @Override
- public boolean deallocateTask(Object task, boolean taskSucceeded) {
- return taskRequestHandler.addDeallocateTaskRequest(task);
- }
-
- @Override
- public Object deallocateContainer(ContainerId containerId) {
- return null;
- }
-
- @Override
- public void serviceInit(Configuration conf) {
- taskRequestHandler = new AsyncDelegateRequestHandler(taskRequestQueue,
- new LocalContainerFactory(appContext),
- taskAllocations,
- appClientDelegate,
- conf);
- asyncDelegateRequestThread = new Thread(taskRequestHandler);
- }
-
- @Override
- public void serviceStart() {
- asyncDelegateRequestThread.start();
- }
-
- @Override
- public void serviceStop() throws InterruptedException {
- if (asyncDelegateRequestThread != null) {
- asyncDelegateRequestThread.interrupt();
- }
- appCallbackExecutor.shutdownNow();
- appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void setShouldUnregister() {
- }
-
- static class LocalContainerFactory {
- final AppContext appContext;
- AtomicInteger nextId;
-
- public LocalContainerFactory(AppContext appContext) {
- this.appContext = appContext;
- this.nextId = new AtomicInteger(1);
- }
-
- public Container createContainer(Resource capability, Priority priority) {
- ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
- ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
- NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
- String nodeHttpAddress = "127.0.0.1:0";
-
- Container container = Container.newInstance(containerId,
- nodeId,
- nodeHttpAddress,
- capability,
- priority,
- null);
-
- return container;
- }
- }
-
- static class TaskRequest implements Comparable<TaskRequest> {
- // Higher prority than Priority.UNDEFINED
- static final int HIGHEST_PRIORITY = -2;
- Object task;
- Priority priority;
-
- public TaskRequest(Object task, Priority priority) {
- this.task = task;
- this.priority = priority;
- }
-
- @Override
- public int compareTo(TaskRequest request) {
- return request.priority.compareTo(this.priority);
- }
- }
-
- static class AllocateTaskRequest extends TaskRequest {
- Resource capability;
- Object clientCookie;
-
- public AllocateTaskRequest(Object task, Resource capability, Priority priority,
- Object clientCookie) {
- super(task, priority);
- this.capability = capability;
- this.clientCookie = clientCookie;
- }
- }
-
- static class DeallocateTaskRequest extends TaskRequest {
- static final Priority DEALLOCATE_PRIORITY = Priority.newInstance(HIGHEST_PRIORITY);
-
- public DeallocateTaskRequest(Object task) {
- super(task, DEALLOCATE_PRIORITY);
- }
- }
-
- static class AsyncDelegateRequestHandler implements Runnable {
- final BlockingQueue<TaskRequest> taskRequestQueue;
- final LocalContainerFactory localContainerFactory;
- final HashMap<Object, Container> taskAllocations;
- final TaskSchedulerAppCallback appClientDelegate;
- final int MAX_TASKS;
-
- AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue,
- LocalContainerFactory localContainerFactory,
- HashMap<Object, Container> taskAllocations,
- TaskSchedulerAppCallback appClientDelegate,
- Configuration conf) {
- this.taskRequestQueue = taskRequestQueue;
- this.localContainerFactory = localContainerFactory;
- this.taskAllocations = taskAllocations;
- this.appClientDelegate = appClientDelegate;
- this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
- TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
- }
-
- public void addAllocateTaskRequest(Object task, Resource capability, Priority priority,
- Object clientCookie) {
- try {
- taskRequestQueue.put(new AllocateTaskRequest(task, capability, priority, clientCookie));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- public boolean addDeallocateTaskRequest(Object task) {
- try {
- taskRequestQueue.put(new DeallocateTaskRequest(task));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- synchronized(taskRequestQueue) {
- taskRequestQueue.notify();
- }
- return true;
- }
-
- boolean shouldWait() {
- return taskAllocations.size() >= MAX_TASKS;
- }
-
- @Override
- public void run() {
- while(true) {
- synchronized(taskRequestQueue) {
- try {
- if (shouldWait()) {
- taskRequestQueue.wait();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- processRequest();
- }
- }
-
- void processRequest() {
- try {
- TaskRequest request = taskRequestQueue.take();
- if (request instanceof AllocateTaskRequest) {
- allocateTask((AllocateTaskRequest)request);
- }
- else if (request instanceof DeallocateTaskRequest) {
- deallocateTask((DeallocateTaskRequest)request);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (NullPointerException e) {
- LOG.warn("Task request was badly constructed");
- }
- }
-
- void allocateTask(AllocateTaskRequest request) {
- Container container = localContainerFactory.createContainer(request.capability,
- request.priority);
- taskAllocations.put(request.task, container);
- appClientDelegate.taskAllocated(request.task, request.clientCookie, container);
- }
-
- void deallocateTask(DeallocateTaskRequest request) {
- Container container = taskAllocations.remove(request.task);
- if (container != null) {
- appClientDelegate.containerBeingReleased(container.getId());
- }
- else {
- LOG.warn("Unable to find and remove task " + request.task + " from task allocations");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
new file mode 100644
index 0000000..22f8557
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -0,0 +1,335 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class LocalTaskSchedulerService extends TaskSchedulerService {
+
+ private static final Log LOG = LogFactory.getLog(LocalTaskSchedulerService.class);
+
+ final TaskSchedulerAppCallback realAppClient;
+ final TaskSchedulerAppCallback appClientDelegate;
+ final ContainerSignatureMatcher containerSignatureMatcher;
+ final PriorityBlockingQueue<TaskRequest> taskRequestQueue;
+ AsyncDelegateRequestHandler taskRequestHandler;
+ Thread asyncDelegateRequestThread;
+ final ExecutorService appCallbackExecutor;
+
+ final HashMap<Object, Container> taskAllocations;
+ final String appHostName;
+ final int appHostPort;
+ final String appTrackingUrl;
+ final AppContext appContext;
+
+ public LocalTaskSchedulerService(TaskSchedulerAppCallback appClient,
+ ContainerSignatureMatcher containerSignatureMatcher, String appHostName,
+ int appHostPort, String appTrackingUrl, AppContext appContext) {
+ super(LocalTaskSchedulerService.class.getName());
+ this.realAppClient = appClient;
+ this.appCallbackExecutor = createAppCallbackExecutorService();
+ this.containerSignatureMatcher = containerSignatureMatcher;
+ this.appClientDelegate = createAppCallbackDelegate(appClient);
+ this.appHostName = appHostName;
+ this.appHostPort = appHostPort;
+ this.appTrackingUrl = appTrackingUrl;
+ this.appContext = appContext;
+ taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
+ taskAllocations = new LinkedHashMap<Object, Container>();
+ }
+
+ private ExecutorService createAppCallbackExecutorService() {
+ return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+ }
+
+ private TaskSchedulerAppCallback createAppCallbackDelegate(
+ TaskSchedulerAppCallback realAppClient) {
+ return new TaskSchedulerAppCallbackWrapper(realAppClient,
+ appCallbackExecutor);
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ Resource freeResources = Resource.newInstance(
+ (int)Runtime.getRuntime().freeMemory()/(1024*1024),
+ Runtime.getRuntime().availableProcessors());
+ return freeResources;
+ }
+
+ @Override
+ public int getClusterNodeCount() {
+ return 1;
+ }
+
+ @Override
+ public void resetMatchLocalityForAllHeldContainers() {
+ }
+
+ @Override
+ public Resource getTotalResources() {
+ Resource totalResources = Resource.newInstance(
+ (int)Runtime.getRuntime().maxMemory()/(1024*1024),
+ Runtime.getRuntime().availableProcessors());
+ return totalResources;
+ }
+
+ @Override
+ public void blacklistNode(NodeId nodeId) {
+ }
+
+ @Override
+ public void unblacklistNode(NodeId nodeId) {
+ }
+
+ @Override
+ public void allocateTask(Object task, Resource capability, String[] hosts,
+ String[] racks, Priority priority, Object containerSignature,
+ Object clientCookie) {
+ taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
+ }
+
+ @Override
+ public synchronized void allocateTask(Object task, Resource capability,
+ ContainerId containerId, Priority priority, Object containerSignature,
+ Object clientCookie) {
+ // in local mode every task is already container level local
+ taskRequestHandler.addAllocateTaskRequest(task, capability, priority, clientCookie);
+ }
+
+ @Override
+ public boolean deallocateTask(Object task, boolean taskSucceeded) {
+ return taskRequestHandler.addDeallocateTaskRequest(task);
+ }
+
+ @Override
+ public Object deallocateContainer(ContainerId containerId) {
+ return null;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ taskRequestHandler = new AsyncDelegateRequestHandler(taskRequestQueue,
+ new LocalContainerFactory(appContext),
+ taskAllocations,
+ appClientDelegate,
+ conf);
+ asyncDelegateRequestThread = new Thread(taskRequestHandler);
+ }
+
+ @Override
+ public void serviceStart() {
+ asyncDelegateRequestThread.start();
+ }
+
+ @Override
+ public void serviceStop() throws InterruptedException {
+ if (asyncDelegateRequestThread != null) {
+ asyncDelegateRequestThread.interrupt();
+ }
+ appCallbackExecutor.shutdownNow();
+ appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setShouldUnregister() {
+ }
+
+ static class LocalContainerFactory {
+ final AppContext appContext;
+ AtomicInteger nextId;
+
+ public LocalContainerFactory(AppContext appContext) {
+ this.appContext = appContext;
+ this.nextId = new AtomicInteger(1);
+ }
+
+ public Container createContainer(Resource capability, Priority priority) {
+ ApplicationAttemptId appAttemptId = appContext.getApplicationAttemptId();
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, nextId.getAndIncrement());
+ NodeId nodeId = NodeId.newInstance("127.0.0.1", 0);
+ String nodeHttpAddress = "127.0.0.1:0";
+
+ Container container = Container.newInstance(containerId,
+ nodeId,
+ nodeHttpAddress,
+ capability,
+ priority,
+ null);
+
+ return container;
+ }
+ }
+
+ static class TaskRequest implements Comparable<TaskRequest> {
+ // Higher prority than Priority.UNDEFINED
+ static final int HIGHEST_PRIORITY = -2;
+ Object task;
+ Priority priority;
+
+ public TaskRequest(Object task, Priority priority) {
+ this.task = task;
+ this.priority = priority;
+ }
+
+ @Override
+ public int compareTo(TaskRequest request) {
+ return request.priority.compareTo(this.priority);
+ }
+ }
+
+ static class AllocateTaskRequest extends TaskRequest {
+ Resource capability;
+ Object clientCookie;
+
+ public AllocateTaskRequest(Object task, Resource capability, Priority priority,
+ Object clientCookie) {
+ super(task, priority);
+ this.capability = capability;
+ this.clientCookie = clientCookie;
+ }
+ }
+
+ static class DeallocateTaskRequest extends TaskRequest {
+ static final Priority DEALLOCATE_PRIORITY = Priority.newInstance(HIGHEST_PRIORITY);
+
+ public DeallocateTaskRequest(Object task) {
+ super(task, DEALLOCATE_PRIORITY);
+ }
+ }
+
+ static class AsyncDelegateRequestHandler implements Runnable {
+ final BlockingQueue<TaskRequest> taskRequestQueue;
+ final LocalContainerFactory localContainerFactory;
+ final HashMap<Object, Container> taskAllocations;
+ final TaskSchedulerAppCallback appClientDelegate;
+ final int MAX_TASKS;
+
+ AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue,
+ LocalContainerFactory localContainerFactory,
+ HashMap<Object, Container> taskAllocations,
+ TaskSchedulerAppCallback appClientDelegate,
+ Configuration conf) {
+ this.taskRequestQueue = taskRequestQueue;
+ this.localContainerFactory = localContainerFactory;
+ this.taskAllocations = taskAllocations;
+ this.appClientDelegate = appClientDelegate;
+ this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
+ TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
+ }
+
+ public void addAllocateTaskRequest(Object task, Resource capability, Priority priority,
+ Object clientCookie) {
+ try {
+ taskRequestQueue.put(new AllocateTaskRequest(task, capability, priority, clientCookie));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public boolean addDeallocateTaskRequest(Object task) {
+ try {
+ taskRequestQueue.put(new DeallocateTaskRequest(task));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ synchronized(taskRequestQueue) {
+ taskRequestQueue.notify();
+ }
+ return true;
+ }
+
+ boolean shouldWait() {
+ return taskAllocations.size() >= MAX_TASKS;
+ }
+
+ @Override
+ public void run() {
+ while(true) {
+ synchronized(taskRequestQueue) {
+ try {
+ if (shouldWait()) {
+ taskRequestQueue.wait();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ processRequest();
+ }
+ }
+
+ void processRequest() {
+ try {
+ TaskRequest request = taskRequestQueue.take();
+ if (request instanceof AllocateTaskRequest) {
+ allocateTask((AllocateTaskRequest)request);
+ }
+ else if (request instanceof DeallocateTaskRequest) {
+ deallocateTask((DeallocateTaskRequest)request);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (NullPointerException e) {
+ LOG.warn("Task request was badly constructed");
+ }
+ }
+
+ void allocateTask(AllocateTaskRequest request) {
+ Container container = localContainerFactory.createContainer(request.capability,
+ request.priority);
+ taskAllocations.put(request.task, container);
+ appClientDelegate.taskAllocated(request.task, request.clientCookie, container);
+ }
+
+ void deallocateTask(DeallocateTaskRequest request) {
+ Container container = taskAllocations.remove(request.task);
+ if (container != null) {
+ appClientDelegate.containerBeingReleased(container.getId());
+ }
+ else {
+ LOG.warn("Unable to find and remove task " + request.task + " from task allocations");
+ }
+ }
+ }
+}
[2/4] TEZ-1218. Make TaskScheduler an Abstract class instead of an
Inteface. Contributed by Jeff Zhang.
Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
new file mode 100644
index 0000000..823ce47
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerService.java
@@ -0,0 +1,106 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public abstract class TaskSchedulerService extends AbstractService{
+
+ public TaskSchedulerService(String name) {
+ super(name);
+ }
+
+ public abstract Resource getAvailableResources();
+
+ public abstract int getClusterNodeCount();
+
+ public abstract void resetMatchLocalityForAllHeldContainers();
+
+ public abstract Resource getTotalResources();
+
+ public abstract void blacklistNode(NodeId nodeId);
+
+ public abstract void unblacklistNode(NodeId nodeId);
+
+ public abstract void allocateTask(Object task, Resource capability,
+ String[] hosts, String[] racks, Priority priority,
+ Object containerSignature, Object clientCookie);
+
+ /**
+ * Allocate affinitized to a specific container
+ */
+ public abstract void allocateTask(Object task, Resource capability,
+ ContainerId containerId, Priority priority, Object containerSignature,
+ Object clientCookie);
+
+ public abstract boolean deallocateTask(Object task, boolean taskSucceeded);
+
+ public abstract Object deallocateContainer(ContainerId containerId);
+
+ public abstract void setShouldUnregister();
+
+ public interface TaskSchedulerAppCallback {
+ public class AppFinalStatus {
+ public final FinalApplicationStatus exitStatus;
+ public final String exitMessage;
+ public final String postCompletionTrackingUrl;
+ public AppFinalStatus(FinalApplicationStatus exitStatus,
+ String exitMessage,
+ String posCompletionTrackingUrl) {
+ this.exitStatus = exitStatus;
+ this.exitMessage = exitMessage;
+ this.postCompletionTrackingUrl = posCompletionTrackingUrl;
+ }
+ }
+ // upcall to app must be outside locks
+ public void taskAllocated(Object task,
+ Object appCookie,
+ Container container);
+ // this may end up being called for a task+container pair that the app
+ // has not heard about. this can happen because of a race between
+ // taskAllocated() upcall and deallocateTask() downcall
+ public void containerCompleted(Object taskLastAllocated,
+ ContainerStatus containerStatus);
+ public void containerBeingReleased(ContainerId containerId);
+ public void nodesUpdated(List<NodeReport> updatedNodes);
+ public void appShutdownRequested();
+ public void setApplicationRegistrationData(
+ Resource maxContainerCapability,
+ Map<ApplicationAccessType, String> appAcls,
+ ByteBuffer clientAMSecretKey
+ );
+ public void onError(Throwable t);
+ public float getProgress();
+ public void preemptContainer(ContainerId containerId);
+ public AppFinalStatus getFinalAppStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
new file mode 100644
index 0000000..597b24f
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -0,0 +1,1951 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.rm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMasterState;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
+import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/* TODO not yet updating cluster nodes on every allocate response
+ * from RMContainerRequestor
+ import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
+ if (clusterNmCount != lastClusterNmCount) {
+ LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+ + clusterNmCount);
+ eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
+ }
+ */
+public class YarnTaskSchedulerService extends TaskSchedulerService
+ implements AMRMClientAsync.CallbackHandler {
+ private static final Log LOG = LogFactory.getLog(YarnTaskSchedulerService.class);
+
+
+
+ final TezAMRMClientAsync<CookieContainerRequest> amRmClient;
+ final TaskSchedulerAppCallback realAppClient;
+ final TaskSchedulerAppCallback appClientDelegate;
+ final ContainerSignatureMatcher containerSignatureMatcher;
+ ExecutorService appCallbackExecutor;
+
+ // Container Re-Use configuration
+ private boolean shouldReuseContainers;
+ private boolean reuseRackLocal;
+ private boolean reuseNonLocal;
+
+ Map<Object, CookieContainerRequest> taskRequests =
+ new HashMap<Object, CookieContainerRequest>();
+ // LinkedHashMap is need in getProgress()
+ LinkedHashMap<Object, Container> taskAllocations =
+ new LinkedHashMap<Object, Container>();
+ /**
+ * Tracks last task assigned to a known container.
+ */
+ Map<ContainerId, Object> containerAssignments =
+ new HashMap<ContainerId, Object>();
+ // Remove inUse depending on resolution of TEZ-1129
+ Set<ContainerId> inUseContainers = Sets.newHashSet();
+ HashMap<ContainerId, Object> releasedContainers =
+ new HashMap<ContainerId, Object>();
+ /**
+ * Map of containers currently being held by the TaskScheduler.
+ */
+ Map<ContainerId, HeldContainer> heldContainers =
+ new HashMap<ContainerId, HeldContainer>();
+
+ Set<Priority> priorityHasAffinity = Sets.newHashSet();
+
+ Set<NodeId> blacklistedNodes = Collections
+ .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
+
+ Resource totalResources = Resource.newInstance(0, 0);
+ Resource allocatedResources = Resource.newInstance(0, 0);
+
+ final String appHostName;
+ final int appHostPort;
+ final String appTrackingUrl;
+ final AppContext appContext;
+
+ AtomicBoolean isStopped = new AtomicBoolean(false);
+
+ private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
+ private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
+ private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
+
+ DelayedContainerManager delayedContainerManager;
+ long localitySchedulingDelay;
+ long sessionDelay;
+
+ @VisibleForTesting
+ protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
+
+ class CRCookie {
+ // Do not use these variables directly. Can caused mocked unit tests to fail.
+ private Object task;
+ private Object appCookie;
+ private Object containerSignature;
+
+ CRCookie(Object task, Object appCookie, Object containerSignature) {
+ this.task = task;
+ this.appCookie = appCookie;
+ this.containerSignature = containerSignature;
+ }
+
+ Object getTask() {
+ return task;
+ }
+
+ Object getAppCookie() {
+ return appCookie;
+ }
+
+ Object getContainerSignature() {
+ return containerSignature;
+ }
+ }
+
+ class CookieContainerRequest extends ContainerRequest {
+ CRCookie cookie;
+ ContainerId affinitizedContainerId;
+
+ public CookieContainerRequest(
+ Resource capability,
+ String[] hosts,
+ String[] racks,
+ Priority priority,
+ CRCookie cookie) {
+ super(capability, hosts, racks, priority);
+ this.cookie = cookie;
+ }
+
+ public CookieContainerRequest(
+ Resource capability,
+ ContainerId containerId,
+ String[] hosts,
+ String[] racks,
+ Priority priority,
+ CRCookie cookie) {
+ this(capability, hosts, racks, priority, cookie);
+ this.affinitizedContainerId = containerId;
+ }
+
+ CRCookie getCookie() {
+ return cookie;
+ }
+
+ ContainerId getAffinitizedContainer() {
+ return affinitizedContainerId;
+ }
+ }
+
+ public YarnTaskSchedulerService(TaskSchedulerAppCallback appClient,
+ ContainerSignatureMatcher containerSignatureMatcher,
+ String appHostName,
+ int appHostPort,
+ String appTrackingUrl,
+ AppContext appContext) {
+ super(YarnTaskSchedulerService.class.getName());
+ this.realAppClient = appClient;
+ this.appCallbackExecutor = createAppCallbackExecutorService();
+ this.containerSignatureMatcher = containerSignatureMatcher;
+ this.appClientDelegate = createAppCallbackDelegate(appClient);
+ this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this);
+ this.appHostName = appHostName;
+ this.appHostPort = appHostPort;
+ this.appTrackingUrl = appTrackingUrl;
+ this.appContext = appContext;
+ }
+
+ @Private
+ @VisibleForTesting
+ YarnTaskSchedulerService(TaskSchedulerAppCallback appClient,
+ ContainerSignatureMatcher containerSignatureMatcher,
+ String appHostName,
+ int appHostPort,
+ String appTrackingUrl,
+ TezAMRMClientAsync<CookieContainerRequest> client,
+ AppContext appContext) {
+ super(YarnTaskSchedulerService.class.getName());
+ this.realAppClient = appClient;
+ this.appCallbackExecutor = createAppCallbackExecutorService();
+ this.containerSignatureMatcher = containerSignatureMatcher;
+ this.appClientDelegate = createAppCallbackDelegate(appClient);
+ this.amRmClient = client;
+ this.appHostName = appHostName;
+ this.appHostPort = appHostPort;
+ this.appTrackingUrl = appTrackingUrl;
+ this.appContext = appContext;
+ }
+
+ @VisibleForTesting
+ ExecutorService createAppCallbackExecutorService() {
+ return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
+ }
+
+ @Override
+ public Resource getAvailableResources() {
+ return amRmClient.getAvailableResources();
+ }
+
+ @Override
+ public int getClusterNodeCount() {
+ // this can potentially be cheaper after YARN-1722
+ return amRmClient.getClusterNodeCount();
+ }
+
+ TaskSchedulerAppCallback createAppCallbackDelegate(
+ TaskSchedulerAppCallback realAppClient) {
+ return new TaskSchedulerAppCallbackWrapper(realAppClient,
+ appCallbackExecutor);
+ }
+
+ @Override
+ public void setShouldUnregister() {
+ this.shouldUnregister.set(true);
+ }
+
+ // AbstractService methods
+ @Override
+ public synchronized void serviceInit(Configuration conf) {
+
+ amRmClient.init(conf);
+ int heartbeatIntervalMax = conf.getInt(
+ TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
+ TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT);
+ amRmClient.setHeartbeatInterval(heartbeatIntervalMax);
+
+ shouldReuseContainers = conf.getBoolean(
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED,
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT);
+ reuseRackLocal = conf.getBoolean(
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED,
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT);
+ reuseNonLocal = conf
+ .getBoolean(
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT);
+ Preconditions.checkArgument(
+ ((!reuseRackLocal && !reuseNonLocal) || (reuseRackLocal)),
+ "Re-use Rack-Local cannot be disabled if Re-use Non-Local has been"
+ + " enabled");
+
+ localitySchedulingDelay = conf.getLong(
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
+ TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT);
+ Preconditions.checkArgument(localitySchedulingDelay >= 0,
+ "Locality Scheduling delay should be >=0");
+
+ sessionDelay = conf.getLong(
+ TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS,
+ TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS_DEFAULT);
+ Preconditions.checkArgument(sessionDelay >= 0 || sessionDelay == -1,
+ "Session delay should be either -1 or >=0");
+
+ delayedContainerManager = new DelayedContainerManager();
+ LOG.info("TaskScheduler initialized with configuration: " +
+ "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
+ ", containerReuseEnabled: " + shouldReuseContainers +
+ ", reuseRackLocal: " + reuseRackLocal +
+ ", reuseNonLocal: " + reuseNonLocal +
+ ", localitySchedulingDelay: " + localitySchedulingDelay +
+ ", sessionDelay=" + sessionDelay);
+ }
+
+ @Override
+ public void serviceStart() {
+ try {
+ RegisterApplicationMasterResponse response;
+ synchronized (this) {
+ amRmClient.start();
+ response = amRmClient.registerApplicationMaster(appHostName,
+ appHostPort,
+ appTrackingUrl);
+ }
+ // upcall to app outside locks
+ appClientDelegate.setApplicationRegistrationData(
+ response.getMaximumResourceCapability(),
+ response.getApplicationACLs(),
+ response.getClientToAMTokenMasterKey());
+
+ delayedContainerManager.start();
+ } catch (YarnException e) {
+ LOG.error("Yarn Exception while registering", e);
+ throw new TezUncheckedException(e);
+ } catch (IOException e) {
+ LOG.error("IO Exception while registering", e);
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ @Override
+ public void serviceStop() throws InterruptedException {
+ // upcall to app outside of locks
+ try {
+ delayedContainerManager.shutdown();
+ // Wait for contianers to be released.
+ delayedContainerManager.join(2000l);
+ synchronized (this) {
+ isStopped.set(true);
+ if (shouldUnregister.get()) {
+ AppFinalStatus status = appClientDelegate.getFinalAppStatus();
+ LOG.info("Unregistering application from RM"
+ + ", exitStatus=" + status.exitStatus
+ + ", exitMessage=" + status.exitMessage
+ + ", trackingURL=" + status.postCompletionTrackingUrl);
+ amRmClient.unregisterApplicationMaster(status.exitStatus,
+ status.exitMessage,
+ status.postCompletionTrackingUrl);
+ }
+ }
+
+ // call client.stop() without lock client will attempt to stop the callback
+ // operation and at the same time the callback operation might be trying
+ // to get our lock.
+ amRmClient.stop();
+ appCallbackExecutor.shutdown();
+ appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
+ } catch (YarnException e) {
+ LOG.error("Yarn Exception while unregistering ", e);
+ throw new TezUncheckedException(e);
+ } catch (IOException e) {
+ LOG.error("IOException while unregistering ", e);
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ // AMRMClientAsync interface methods
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ if (isStopped.get()) {
+ return;
+ }
+ Map<Object, ContainerStatus> appContainerStatus =
+ new HashMap<Object, ContainerStatus>(statuses.size());
+ synchronized (this) {
+ for(ContainerStatus containerStatus : statuses) {
+ ContainerId completedId = containerStatus.getContainerId();
+ HeldContainer delayedContainer = heldContainers.get(completedId);
+
+ Object task = releasedContainers.remove(completedId);
+ if(task != null){
+ if (delayedContainer != null) {
+ LOG.warn("Held container should be null since releasedContainer is not");
+ }
+ // TODO later we may want to check if exit code matched expectation
+ // e.g. successful container should not come back fail exit code after
+ // being released
+ // completion of a container we had released earlier
+ // an allocated container completed. notify app
+ LOG.info("Released container completed:" + completedId +
+ " last allocated to task: " + task);
+ appContainerStatus.put(task, containerStatus);
+ continue;
+ }
+
+ // not found in released containers. check currently allocated containers
+ // no need to release this container as the RM has already completed it
+ task = unAssignContainer(completedId, false);
+ if (delayedContainer != null) {
+ heldContainers.remove(completedId);
+ Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
+ } else {
+ LOG.warn("Held container expected to be not null for a non-AM-released container");
+ }
+ if(task != null) {
+ // completion of a container we have allocated currently
+ // an allocated container completed. notify app
+ LOG.info("Allocated container completed:" + completedId +
+ " last allocated to task: " + task);
+ appContainerStatus.put(task, containerStatus);
+ continue;
+ }
+
+ // container neither allocated nor released
+ LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
+ }
+ }
+
+ // upcall to app must be outside locks
+ for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
+ appClientDelegate.containerCompleted(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ if (isStopped.get()) {
+ return;
+ }
+ Map<CookieContainerRequest, Container> assignedContainers;
+
+ if (LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder();
+ for (Container container: containers) {
+ sb.append(container.getId()).append(", ");
+ }
+ LOG.debug("Assigned New Containers: " + sb.toString());
+ }
+
+ synchronized (this) {
+ if (!shouldReuseContainers) {
+ List<Container> modifiableContainerList = Lists.newLinkedList(containers);
+ assignedContainers = assignNewlyAllocatedContainers(
+ modifiableContainerList);
+ } else {
+ // unify allocations
+ pushNewContainerToDelayed(containers);
+ return;
+ }
+ }
+
+ // upcall to app must be outside locks
+ informAppAboutAssignments(assignedContainers);
+ }
+
+ /**
+ * Tries assigning the list of specified containers. Optionally, release
+ * containers or add them to the delayed container queue.
+ *
+ * The flags apply to all containers in the specified lists. So, separate
+ * calls should be made based on the expected behaviour.
+ *
+ * @param containers
+ * The list of containers to be assigned. The list *may* be modified
+ * in place based on allocations and releases.
+ * @return Assignments.
+ */
+ private synchronized Map<CookieContainerRequest, Container>
+ assignNewlyAllocatedContainers(Iterable<Container> containers) {
+
+ Map<CookieContainerRequest, Container> assignedContainers =
+ new HashMap<CookieContainerRequest, Container>();
+ assignNewContainersWithLocation(containers,
+ NODE_LOCAL_ASSIGNER, assignedContainers);
+ assignNewContainersWithLocation(containers,
+ RACK_LOCAL_ASSIGNER, assignedContainers);
+ assignNewContainersWithLocation(containers,
+ NON_LOCAL_ASSIGNER, assignedContainers);
+
+ // Release any unassigned containers given by the RM
+ releaseUnassignedContainers(containers);
+
+ return assignedContainers;
+ }
+
+ private synchronized Map<CookieContainerRequest, Container>
+ tryAssignReUsedContainers(Iterable<Container> containers) {
+
+ Map<CookieContainerRequest, Container> assignedContainers =
+ new HashMap<CookieContainerRequest, Container>();
+
+ // Honor locality and match as many as possible
+ assignReUsedContainersWithLocation(containers,
+ NODE_LOCAL_ASSIGNER, assignedContainers, true);
+ assignReUsedContainersWithLocation(containers,
+ RACK_LOCAL_ASSIGNER, assignedContainers, true);
+ assignReUsedContainersWithLocation(containers,
+ NON_LOCAL_ASSIGNER, assignedContainers, true);
+
+ return assignedContainers;
+ }
+
+ /**
+ * Try to assign a re-used container
+ * @param heldContainer Container to be used to assign to tasks
+ * @return Assigned container map
+ */
+
+ private synchronized Map<CookieContainerRequest, Container>
+ assignDelayedContainer(HeldContainer heldContainer) {
+
+ DAGAppMasterState state = appContext.getAMState();
+ boolean isNew = heldContainer.isNew();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to assign a delayed container"
+ + ", containerId=" + heldContainer.getContainer().getId()
+ + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
+ + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
+ + ", AMState=" + state
+ + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
+ + ", taskRequestsCount=" + taskRequests.size()
+ + ", heldContainers=" + heldContainers.size()
+ + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ + ", isNew=" + isNew);
+ }
+
+ if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
+ // reset locality level on held container
+ // if sessionDelay defined, push back into delayed queue if not already
+ // done so
+
+ heldContainer.resetLocalityMatchLevel();
+ long currentTime = System.currentTimeMillis();
+ if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
+ && sessionDelay != -1)) {
+ LOG.info("No taskRequests. Container's session delay expired or is new. " +
+ "Releasing container"
+ + ", containerId=" + heldContainer.container.getId()
+ + ", containerExpiryTime="
+ + heldContainer.getContainerExpiryTime()
+ + ", sessionDelay=" + sessionDelay
+ + ", taskRequestsCount=" + taskRequests.size()
+ + ", heldContainers=" + heldContainers.size()
+ + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ + ", isNew=" + isNew);
+ releaseUnassignedContainers(
+ Lists.newArrayList(heldContainer.container));
+ } else {
+ if (!appContext.isSession()) {
+ releaseUnassignedContainers(
+ Lists.newArrayList(heldContainer.container));
+ } else {
+ // only put back in queue if this is a session
+ heldContainer.resetLocalityMatchLevel();
+ delayedContainerManager.addDelayedContainer(
+ heldContainer.getContainer(),
+ currentTime + localitySchedulingDelay);
+ }
+ }
+ } else if (state.equals(DAGAppMasterState.RUNNING)) {
+ HeldContainer.LocalityMatchLevel localityMatchLevel =
+ heldContainer.getLocalityMatchLevel();
+ Map<CookieContainerRequest, Container> assignedContainers =
+ new HashMap<CookieContainerRequest, Container>();
+
+ Container containerToAssign = heldContainer.container;
+
+ heldContainer.incrementAssignmentAttempts();
+ // Each time a container is seen, we try node, rack and non-local in that
+ // order depending on matching level allowed
+
+ // if match level is NEW or NODE, match only at node-local
+ // always try node local matches for other levels
+ if (isNew
+ || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NEW)
+ || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NODE)
+ || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
+ || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NON_LOCAL)) {
+ assignReUsedContainerWithLocation(containerToAssign,
+ NODE_LOCAL_ASSIGNER, assignedContainers, true);
+ if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
+ LOG.info("Failed to assign tasks to delayed container using node"
+ + ", containerId=" + heldContainer.getContainer().getId());
+ }
+ }
+
+ // if re-use allowed at rack
+ // match against rack if match level is RACK or NON-LOCAL
+ // if scheduling delay is 0, match at RACK allowed without a sleep
+ if (assignedContainers.isEmpty()) {
+ if ((reuseRackLocal || isNew) && (localitySchedulingDelay == 0 ||
+ (localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
+ || localityMatchLevel.equals(
+ HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
+ assignReUsedContainerWithLocation(containerToAssign,
+ RACK_LOCAL_ASSIGNER, assignedContainers, false);
+ if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
+ LOG.info("Failed to assign tasks to delayed container using rack"
+ + ", containerId=" + heldContainer.getContainer().getId());
+ }
+ }
+ }
+
+ // if re-use allowed at non-local
+ // match against rack if match level is NON-LOCAL
+ // if scheduling delay is 0, match at NON-LOCAL allowed without a sleep
+ if (assignedContainers.isEmpty()) {
+ if ((reuseNonLocal || isNew) && (localitySchedulingDelay == 0
+ || localityMatchLevel.equals(
+ HeldContainer.LocalityMatchLevel.NON_LOCAL))) {
+ assignReUsedContainerWithLocation(containerToAssign,
+ NON_LOCAL_ASSIGNER, assignedContainers, false);
+ if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
+ LOG.info("Failed to assign tasks to delayed container using non-local"
+ + ", containerId=" + heldContainer.getContainer().getId());
+ }
+ }
+ }
+
+ if (assignedContainers.isEmpty()) {
+
+ long currentTime = System.currentTimeMillis();
+
+ // Release container if final expiry time is reached
+ // Dont release a new container. The RM may not give us new ones
+ if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
+ && sessionDelay != -1) {
+ LOG.info("Container's session delay expired. Releasing container"
+ + ", containerId=" + heldContainer.container.getId()
+ + ", containerExpiryTime="
+ + heldContainer.getContainerExpiryTime()
+ + ", sessionDelay=" + sessionDelay);
+ releaseUnassignedContainers(
+ Lists.newArrayList(heldContainer.container));
+ } else {
+
+ // Let's decide if this container has hit the end of the road
+
+ // EOL true if container's match level is NON-LOCAL
+ boolean hitFinalMatchLevel = localityMatchLevel.equals(
+ HeldContainer.LocalityMatchLevel.NON_LOCAL);
+ if (!hitFinalMatchLevel) {
+ // EOL also true if locality delay is 0
+ // or rack-local or non-local is disabled
+ heldContainer.incrementLocalityMatchLevel();
+ if (localitySchedulingDelay == 0 ||
+ (!reuseRackLocal
+ || (!reuseNonLocal &&
+ heldContainer.getLocalityMatchLevel().equals(
+ HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
+ hitFinalMatchLevel = true;
+ }
+ // the above if-stmt does not apply to new containers since they will
+ // be matched at all locality levels. So there finalMatchLevel cannot
+ // be short-circuited
+ if (localitySchedulingDelay > 0 && isNew) {
+ hitFinalMatchLevel = false;
+ }
+ }
+
+ if (hitFinalMatchLevel) {
+ boolean safeToRelease = true;
+ Priority topPendingPriority = amRmClient.getTopPriority();
+ Priority containerPriority = heldContainer.container.getPriority();
+ if (isNew && topPendingPriority != null &&
+ containerPriority.compareTo(topPendingPriority) < 0) {
+ // this container is of lower priority and given to us by the RM for
+ // a task that will be matched after the current top priority. Keep
+ // this container for those pending tasks since the RM is not going
+ // to give this container to us again
+ safeToRelease = false;
+ }
+
+ // Are there any pending requests at any priority?
+ // release if there are tasks or this is not a session
+ if (safeToRelease &&
+ (!taskRequests.isEmpty() || !appContext.isSession())) {
+ LOG.info("Releasing held container as either there are pending but "
+ + " unmatched requests or this is not a session"
+ + ", containerId=" + heldContainer.container.getId()
+ + ", pendingTasks=" + !taskRequests.isEmpty()
+ + ", isSession=" + appContext.isSession()
+ + ". isNew=" + isNew);
+ releaseUnassignedContainers(
+ Lists.newArrayList(heldContainer.container));
+ } else {
+ // if no tasks, treat this like an idle session
+ heldContainer.resetLocalityMatchLevel();
+ delayedContainerManager.addDelayedContainer(
+ heldContainer.getContainer(),
+ currentTime + localitySchedulingDelay);
+ }
+ } else {
+ // Schedule delay container to match at a later try
+ delayedContainerManager.addDelayedContainer(
+ heldContainer.getContainer(),
+ currentTime + localitySchedulingDelay);
+ }
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("Delayed container assignment successful"
+ + ", containerId=" + heldContainer.getContainer().getId());
+ }
+
+ return assignedContainers;
+ } else {
+ // ignore all other cases?
+ LOG.warn("Received a request to assign re-used containers when AM was "
+ + " in state: " + state + ". Ignoring request and releasing container"
+ + ": " + heldContainer.getContainer().getId());
+ releaseUnassignedContainers(Lists.newArrayList(heldContainer.container));
+ }
+
+ return null;
+ }
+
+ @Override
+ public synchronized void resetMatchLocalityForAllHeldContainers() {
+ for (HeldContainer heldContainer : heldContainers.values()) {
+ heldContainer.resetLocalityMatchLevel();
+ }
+ synchronized(delayedContainerManager) {
+ delayedContainerManager.notify();
+ }
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ if (isStopped.get()) {
+ return;
+ }
+ // upcall to app must be outside locks
+ appClientDelegate.appShutdownRequested();
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ if (isStopped.get()) {
+ return;
+ }
+ // ignore bad nodes for now
+ // upcall to app must be outside locks
+ appClientDelegate.nodesUpdated(updatedNodes);
+ }
+
+ @Override
+ public float getProgress() {
+ if (isStopped.get()) {
+ return 1;
+ }
+
+ if(totalResources.getMemory() == 0) {
+ // assume this is the first allocate callback. nothing is allocated.
+ // available resource = totalResource
+ // TODO this will not handle dynamic changes in resources
+ totalResources = Resources.clone(getAvailableResources());
+ LOG.info("App total resource memory: " + totalResources.getMemory() +
+ " cpu: " + totalResources.getVirtualCores() +
+ " taskAllocations: " + taskAllocations.size());
+ }
+
+ preemptIfNeeded();
+
+ return appClientDelegate.getProgress();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (isStopped.get()) {
+ return;
+ }
+ appClientDelegate.onError(t);
+ }
+
+ @Override
+ public Resource getTotalResources() {
+ return totalResources;
+ }
+
+ @Override
+ public synchronized void blacklistNode(NodeId nodeId) {
+ LOG.info("Blacklisting node: " + nodeId);
+ amRmClient.addNodeToBlacklist(nodeId);
+ blacklistedNodes.add(nodeId);
+ }
+
+ @Override
+ public synchronized void unblacklistNode(NodeId nodeId) {
+ if (blacklistedNodes.remove(nodeId)) {
+ LOG.info("UnBlacklisting node: " + nodeId);
+ amRmClient.removeNodeFromBlacklist(nodeId);
+ }
+ }
+
+ @Override
+ public synchronized void allocateTask(
+ Object task,
+ Resource capability,
+ String[] hosts,
+ String[] racks,
+ Priority priority,
+ Object containerSignature,
+ Object clientCookie) {
+
+ // XXX Have ContainerContext implement an interface defined by TaskScheduler.
+ // TODO check for nulls etc
+ // TODO extra memory allocation
+ CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
+ CookieContainerRequest request = new CookieContainerRequest(
+ capability, hosts, racks, priority, cookie);
+
+ addRequestAndTrigger(task, request, hosts, racks);
+ }
+
+ @Override
+ public synchronized void allocateTask(
+ Object task,
+ Resource capability,
+ ContainerId containerId,
+ Priority priority,
+ Object containerSignature,
+ Object clientCookie) {
+
+ HeldContainer heldContainer = heldContainers.get(containerId);
+ String[] hosts = null;
+ String[] racks = null;
+ if (heldContainer != null) {
+ Container container = heldContainer.getContainer();
+ if (canFit(capability, container.getResource())) {
+ // just specify node and use YARN's soft locality constraint for the rest
+ hosts = new String[1];
+ hosts[0] = container.getNodeId().getHost();
+ priorityHasAffinity.add(priority);
+ } else {
+ LOG.warn("Matching requested to container: " + containerId +
+ " but requested capability: " + capability +
+ " does not fit in container resource: " + container.getResource());
+ }
+ } else {
+ LOG.warn("Matching requested to unknown container: " + containerId);
+ }
+
+ CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
+ CookieContainerRequest request = new CookieContainerRequest(
+ capability, containerId, hosts, racks, priority, cookie);
+
+ addRequestAndTrigger(task, request, hosts, racks);
+ }
+
+ private void addRequestAndTrigger(Object task, CookieContainerRequest request,
+ String[] hosts, String[] racks) {
+ addTaskRequest(task, request);
+ // See if any of the delayedContainers can be used for this task.
+ delayedContainerManager.triggerScheduling(true);
+ LOG.info("Allocation request for task: " + task +
+ " with request: " + request +
+ " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
+ " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
+ }
+
+ /**
+ * @param task
+ * the task to de-allocate.
+ * @param taskSucceeded
+ * specify whether the task succeeded or failed.
+ * @return true if a container is assigned to this task.
+ */
+ @Override
+ public boolean deallocateTask(Object task, boolean taskSucceeded) {
+ Map<CookieContainerRequest, Container> assignedContainers = null;
+
+ synchronized (this) {
+ CookieContainerRequest request = removeTaskRequest(task);
+ if (request != null) {
+ // task not allocated yet
+ LOG.info("Deallocating task: " + task + " before allocation");
+ return false;
+ }
+
+ // task request not present. Look in allocations
+ Container container = doBookKeepingForTaskDeallocate(task);
+ if (container == null) {
+ // task neither requested nor allocated.
+ LOG.info("Ignoring removal of unknown task: " + task);
+ return false;
+ } else {
+ LOG.info("Deallocated task: " + task + " from container: "
+ + container.getId());
+
+ if (!taskSucceeded || !shouldReuseContainers) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Releasing container, containerId=" + container.getId()
+ + ", taskSucceeded=" + taskSucceeded
+ + ", reuseContainersFlag=" + shouldReuseContainers);
+ }
+ releaseContainer(container.getId());
+ } else {
+ // Don't attempt to delay containers if delay is 0.
+ HeldContainer heldContainer = heldContainers.get(container.getId());
+ if (heldContainer != null) {
+ heldContainer.resetLocalityMatchLevel();
+ long currentTime = System.currentTimeMillis();
+ if (sessionDelay > 0) {
+ heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
+ }
+ assignedContainers = assignDelayedContainer(heldContainer);
+ } else {
+ LOG.info("Skipping container after task deallocate as container is"
+ + " no longer running, containerId=" + container.getId());
+ }
+ }
+ }
+ }
+
+ // up call outside of the lock.
+ if (assignedContainers != null && assignedContainers.size() == 1) {
+ informAppAboutAssignments(assignedContainers);
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized Object deallocateContainer(ContainerId containerId) {
+ Object task = unAssignContainer(containerId, true);
+ if(task != null) {
+ LOG.info("Deallocated container: " + containerId +
+ " from task: " + task);
+ return task;
+ }
+
+ LOG.info("Ignoring dealloction of unknown container: " + containerId);
+ return null;
+ }
+
+ boolean canFit(Resource arg0, Resource arg1) {
+ int mem0 = arg0.getMemory();
+ int mem1 = arg1.getMemory();
+ int cpu0 = arg0.getVirtualCores();
+ int cpu1 = arg1.getVirtualCores();
+
+ if(mem0 <= mem1 && cpu0 <= cpu1) {
+ return true;
+ }
+ return false;
+ }
+
+ void preemptIfNeeded() {
+ ContainerId preemptedContainer = null;
+ synchronized (this) {
+ Resource freeResources = Resources.subtract(totalResources,
+ allocatedResources);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
+ " cpu:" + allocatedResources.getVirtualCores() +
+ " delayedContainers: " + delayedContainerManager.delayedContainers.size());
+ }
+ assert freeResources.getMemory() >= 0;
+
+ CookieContainerRequest highestPriRequest = null;
+ for(CookieContainerRequest request : taskRequests.values()) {
+ if(highestPriRequest == null) {
+ highestPriRequest = request;
+ } else if(isHigherPriority(request.getPriority(),
+ highestPriRequest.getPriority())){
+ highestPriRequest = request;
+ }
+ }
+ if(highestPriRequest != null &&
+ !fitsIn(highestPriRequest.getCapability(), freeResources)) {
+ // highest priority request will not fit in existing free resources
+ // free up some more
+ // TODO this is subject to error wrt RM resource normalization
+
+ // This request must have been considered for matching with all existing
+ // containers when request was made.
+ Container lowestPriNewContainer = null;
+ // could not find anything to preempt. Check if we can release unused
+ // containers
+ for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
+ if (!heldContainer.isNew()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reused container exists. Wait for assignment loop to release it. "
+ + heldContainer.getContainer().getId());
+ }
+ return;
+ }
+ if (heldContainer.geNumAssignmentAttempts() < 3) {
+ // we havent tried to assign this container at node/rack/ANY
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Brand new container. Wait for assignment loop to match it. "
+ + heldContainer.getContainer().getId());
+ }
+ return;
+ }
+ Container container = heldContainer.getContainer();
+ if (lowestPriNewContainer == null ||
+ isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
+ // there is a lower priority new container
+ lowestPriNewContainer = container;
+ }
+ }
+
+ if (lowestPriNewContainer != null) {
+ LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
+ " with priority: " + lowestPriNewContainer.getPriority() +
+ " to free resource for request: " + highestPriRequest +
+ " . Current free resources: " + freeResources);
+ releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
+ // We are returning an unused resource back the RM. The RM thinks it
+ // has serviced our initial request and will not re-allocate this back
+ // to us anymore. So we need to ask for this again. If there is no
+ // outstanding request at that priority then its fine to not ask again.
+ // See TEZ-915 for more details
+ for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
+ Object task = entry.getKey();
+ CookieContainerRequest request = entry.getValue();
+ if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
+ LOG.info("Resending request for task again: " + task);
+ deallocateTask(task, true);
+ allocateTask(task, request.getCapability(),
+ (request.getNodes() == null ? null :
+ request.getNodes().toArray(new String[request.getNodes().size()])),
+ (request.getRacks() == null ? null :
+ request.getRacks().toArray(new String[request.getRacks().size()])),
+ request.getPriority(),
+ request.getCookie().getContainerSignature(),
+ request.getCookie().getAppCookie());
+ break;
+ }
+ }
+
+ return;
+ }
+
+ // this assert will be a no-op in production but can help identify
+ // invalid assumptions during testing
+ assert delayedContainerManager.delayedContainers.isEmpty();
+
+ // there are no reused or new containers to release
+ // try to preempt running containers
+ Map.Entry<Object, Container> preemptedEntry = null;
+ for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
+ HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
+ CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
+ Priority taskPriority = lastTaskInfo.getPriority();
+ Object signature = lastTaskInfo.getCookie().getContainerSignature();
+ if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
+ // higher or same priority
+ continue;
+ }
+ if (containerSignatureMatcher.isExactMatch(
+ highestPriRequest.getCookie().getContainerSignature(),
+ signature)) {
+ // exact match with different priorities
+ continue;
+ }
+ if(preemptedEntry == null ||
+ !isHigherPriority(taskPriority,
+ preemptedEntry.getValue().getPriority())) {
+ // keep the lower priority or the one added later
+ preemptedEntry = entry;
+ }
+ }
+ if(preemptedEntry != null) {
+ // found something to preempt
+ LOG.info("Preempting task: " + preemptedEntry.getKey() +
+ " to free resource for request: " + highestPriRequest +
+ " . Current free resources: " + freeResources);
+ preemptedContainer = preemptedEntry.getValue().getId();
+ // app client will be notified when after container is killed
+ // and we get its completed container status
+ }
+ }
+ }
+
+ // upcall outside locks
+ if (preemptedContainer != null) {
+ appClientDelegate.preemptContainer(preemptedContainer);
+ }
+ }
+
+ private boolean fitsIn(Resource toFit, Resource resource) {
+ // YARN-893 prevents using correct library code
+ //return Resources.fitsIn(toFit, resource);
+ return resource.getMemory() >= toFit.getMemory();
+ }
+
+ private CookieContainerRequest getMatchingRequestWithPriority(
+ Container container,
+ String location) {
+ Priority priority = container.getPriority();
+ Resource capability = container.getResource();
+ List<? extends Collection<CookieContainerRequest>> requestsList =
+ amRmClient.getMatchingRequests(priority, location, capability);
+
+ if (!requestsList.isEmpty()) {
+ // pick first one
+ for (Collection<CookieContainerRequest> requests : requestsList) {
+ for (CookieContainerRequest cookieContainerRequest : requests) {
+ if (canAssignTaskToContainer(cookieContainerRequest, container)) {
+ return cookieContainerRequest;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ private CookieContainerRequest getMatchingRequestWithoutPriority(
+ Container container,
+ String location,
+ boolean considerContainerAffinity) {
+ Resource capability = container.getResource();
+ List<? extends Collection<CookieContainerRequest>> pRequestsList =
+ amRmClient.getMatchingRequestsForTopPriority(location, capability);
+ if (considerContainerAffinity &&
+ !priorityHasAffinity.contains(amRmClient.getTopPriority())) {
+ considerContainerAffinity = false;
+ }
+ if (pRequestsList == null || pRequestsList.isEmpty()) {
+ return null;
+ }
+ CookieContainerRequest firstMatch = null;
+ for (Collection<CookieContainerRequest> requests : pRequestsList) {
+ for (CookieContainerRequest cookieContainerRequest : requests) {
+ if (firstMatch == null || // we dont have a match. So look for one
+ // we have a match but are looking for a better container level match.
+ // skip the expensive canAssignTaskToContainer() if the request is
+ // not affinitized to the container
+ container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
+ ) {
+ if (canAssignTaskToContainer(cookieContainerRequest, container)) {
+ // request matched to container
+ if (!considerContainerAffinity) {
+ return cookieContainerRequest;
+ }
+ ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
+ boolean canMatchTaskWithAffinity = true;
+ if (affCId == null ||
+ !heldContainers.containsKey(affCId) ||
+ inUseContainers.contains(affCId)) {
+ // affinity not specified
+ // affinitized container is no longer held
+ // affinitized container is in use
+ canMatchTaskWithAffinity = false;
+ }
+ if (canMatchTaskWithAffinity) {
+ if (container.getId().equals(
+ cookieContainerRequest.getAffinitizedContainer())) {
+ // container level match
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Matching with affinity for request: "
+ + cookieContainerRequest + " container: " + affCId);
+ }
+ return cookieContainerRequest;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping request for container " + container.getId()
+ + " due to affinity. Request: " + cookieContainerRequest
+ + " affContainer: " + affCId);
+ }
+ } else {
+ firstMatch = cookieContainerRequest;
+ }
+ }
+ }
+ }
+ }
+
+ return firstMatch;
+ }
+
+ private boolean canAssignTaskToContainer(
+ CookieContainerRequest cookieContainerRequest, Container container) {
+ HeldContainer heldContainer = heldContainers.get(container.getId());
+ if (heldContainer == null || heldContainer.isNew()) { // New container.
+ return true;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to match task to a held container, "
+ + " containerId=" + heldContainer.container.getId());
+ }
+ if (containerSignatureMatcher.isSuperSet(heldContainer
+ .getFirstContainerSignature(), cookieContainerRequest.getCookie()
+ .getContainerSignature())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Matched delayed container to task"
+ + " containerId=" + heldContainer.container.getId());
+ }
+ return true;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to match delayed container to task"
+ + " containerId=" + heldContainer.container.getId());
+ }
+ return false;
+ }
+
+ private Object getTask(CookieContainerRequest request) {
+ return request.getCookie().getTask();
+ }
+
+ private void releaseContainer(ContainerId containerId) {
+ Object assignedTask = containerAssignments.remove(containerId);
+ if (assignedTask != null) {
+ // A task was assigned to this container at some point. Inform the app.
+ appClientDelegate.containerBeingReleased(containerId);
+ }
+ HeldContainer delayedContainer = heldContainers.remove(containerId);
+ if (delayedContainer != null) {
+ Resources.subtractFrom(allocatedResources,
+ delayedContainer.getContainer().getResource());
+ }
+ if (delayedContainer != null || !shouldReuseContainers) {
+ amRmClient.releaseAssignedContainer(containerId);
+ }
+ if (assignedTask != null) {
+ // A task was assigned at some point. Add to release list since we are
+ // releasing the container.
+ releasedContainers.put(containerId, assignedTask);
+ }
+ }
+
+ private void assignContainer(Object task,
+ Container container,
+ CookieContainerRequest assigned) {
+ CookieContainerRequest request = removeTaskRequest(task);
+ assert request != null;
+ //assert assigned.equals(request);
+
+ Container result = taskAllocations.put(task, container);
+ assert result == null;
+ inUseContainers.add(container.getId());
+ containerAssignments.put(container.getId(), task);
+ HeldContainer heldContainer = heldContainers.get(container.getId());
+ if (!shouldReuseContainers && heldContainer == null) {
+ heldContainers.put(container.getId(), new HeldContainer(container,
+ -1, -1, assigned));
+ Resources.addTo(allocatedResources, container.getResource());
+ } else {
+ if (heldContainer.isNew()) {
+ // check for existence before adding since the first container potentially
+ // has the broadest signature as subsequent uses dont expand any dimension.
+ // This will need to be enhanced to track other signatures too when we
+ // think about preferring within vertex matching etc.
+ heldContainers.put(container.getId(),
+ new HeldContainer(container, heldContainer.getNextScheduleTime(),
+ heldContainer.getContainerExpiryTime(), assigned));
+ }
+ heldContainer.setLastTaskInfo(assigned);
+ }
+ }
+
+ private void pushNewContainerToDelayed(List<Container> containers){
+ long expireTime = -1;
+ if (sessionDelay > 0) {
+ long currentTime = System.currentTimeMillis();
+ expireTime = currentTime + sessionDelay;
+ }
+
+ synchronized (delayedContainerManager) {
+ for (Container container : containers) {
+ if (heldContainers.put(container.getId(), new HeldContainer(container,
+ -1, expireTime, null)) != null) {
+ throw new TezUncheckedException("New container " + container.getId()
+ + " is already held.");
+ }
+ long nextScheduleTime = delayedContainerManager.maxScheduleTimeSeen;
+ if (delayedContainerManager.maxScheduleTimeSeen == -1) {
+ nextScheduleTime = System.currentTimeMillis();
+ }
+ Resources.addTo(allocatedResources, container.getResource());
+ delayedContainerManager.addDelayedContainer(container,
+ nextScheduleTime + 1);
+ }
+ }
+ delayedContainerManager.triggerScheduling(false);
+ }
+
+ private CookieContainerRequest removeTaskRequest(Object task) {
+ CookieContainerRequest request = taskRequests.remove(task);
+ if(request != null) {
+ // remove all references of the request from AMRMClient
+ amRmClient.removeContainerRequest(request);
+ }
+ return request;
+ }
+
+ private void addTaskRequest(Object task,
+ CookieContainerRequest request) {
+ CookieContainerRequest oldRequest = taskRequests.put(task, request);
+ if (oldRequest != null) {
+ // remove all references of the request from AMRMClient
+ amRmClient.removeContainerRequest(oldRequest);
+ }
+ amRmClient.addContainerRequest(request);
+ }
+
+ private Container doBookKeepingForTaskDeallocate(Object task) {
+ Container container = taskAllocations.remove(task);
+ if (container == null) {
+ return null;
+ }
+ inUseContainers.remove(container.getId());
+ return container;
+ }
+
+ private Object unAssignContainer(ContainerId containerId,
+ boolean releaseIfFound) {
+ // Not removing. containerAssignments tracks the last task run on a
+ // container.
+ Object task = containerAssignments.get(containerId);
+ if(task == null) {
+ return null;
+ }
+ Container container = taskAllocations.remove(task);
+ assert container != null;
+ inUseContainers.remove(containerId);
+ if(releaseIfFound) {
+ releaseContainer(containerId);
+ }
+ return task;
+ }
+
+ private boolean isHigherPriority(Priority lhs, Priority rhs) {
+ return lhs.getPriority() < rhs.getPriority();
+ }
+
+ private synchronized void assignNewContainersWithLocation(
+ Iterable<Container> containers,
+ ContainerAssigner assigner,
+ Map<CookieContainerRequest, Container> assignedContainers) {
+
+ Iterator<Container> containerIterator = containers.iterator();
+ while (containerIterator.hasNext()) {
+ Container container = containerIterator.next();
+ CookieContainerRequest assigned =
+ assigner.assignNewContainer(container);
+ if (assigned != null) {
+ assignedContainers.put(assigned, container);
+ containerIterator.remove();
+ }
+ }
+ }
+
+ private synchronized void assignReUsedContainersWithLocation(
+ Iterable<Container> containers,
+ ContainerAssigner assigner,
+ Map<CookieContainerRequest, Container> assignedContainers,
+ boolean honorLocality) {
+
+ Iterator<Container> containerIterator = containers.iterator();
+ while (containerIterator.hasNext()) {
+ Container container = containerIterator.next();
+ if (assignReUsedContainerWithLocation(container, assigner,
+ assignedContainers, honorLocality)) {
+ containerIterator.remove();
+ }
+ }
+ }
+
+ private synchronized boolean assignReUsedContainerWithLocation(
+ Container container,
+ ContainerAssigner assigner,
+ Map<CookieContainerRequest, Container> assignedContainers,
+ boolean honorLocality) {
+
+ Priority containerPriority = container.getPriority();
+ Priority topPendingTaskPriority = amRmClient.getTopPriority();
+ if (topPendingTaskPriority == null) {
+ // nothing left to assign
+ return false;
+ }
+
+ if (topPendingTaskPriority.compareTo(containerPriority) > 0) {
+ // if the next task to assign is higher priority than the container then
+ // dont assign this container to that task.
+ // if task and container are equal priority - then its first use or reuse
+ // within the same priority - safe to use
+ // if task is lower priority than container then its we use a container that
+ // is no longer needed by higher priority tasks All those higher pri tasks
+ // have been assigned resources - safe to use (first use or reuse)
+ // if task is higher priority than container then we may end up using a
+ // container that was assigned by the RM for a lower priority pending task
+ // that will be assigned after this higher priority task is assigned. If we
+ // use that task's container now then we may not be able to match this
+ // container to that task later on. However the RM has already assigned us
+ // all containers and is not going to give us new containers. We will get
+ // stuck for resources.
+ return false;
+ }
+
+ CookieContainerRequest assigned =
+ assigner.assignReUsedContainer(container, honorLocality);
+ if (assigned != null) {
+ assignedContainers.put(assigned, container);
+ return true;
+ }
+ return false;
+ }
+
+ private void releaseUnassignedContainers(Iterable<Container> containers) {
+ for (Container container : containers) {
+ LOG.info("Releasing unused container: "
+ + container.getId());
+ releaseContainer(container.getId());
+ }
+ }
+
+ private void informAppAboutAssignment(CookieContainerRequest assigned,
+ Container container) {
+ appClientDelegate.taskAllocated(getTask(assigned),
+ assigned.getCookie().getAppCookie(), container);
+ }
+
+ private void informAppAboutAssignments(
+ Map<CookieContainerRequest, Container> assignedContainers) {
+ if (assignedContainers == null || assignedContainers.isEmpty()) {
+ return;
+ }
+ for (Entry<CookieContainerRequest, Container> entry : assignedContainers
+ .entrySet()) {
+ Container container = entry.getValue();
+ // check for blacklisted nodes. There may be race conditions between
+ // setting blacklist and receiving allocations
+ if (blacklistedNodes.contains(container.getNodeId())) {
+ CookieContainerRequest request = entry.getKey();
+ Object task = getTask(request);
+ LOG.info("Container: " + container.getId() +
+ " allocated on blacklisted node: " + container.getNodeId() +
+ " for task: " + task);
+ Object deAllocTask = deallocateContainer(container.getId());
+ assert deAllocTask.equals(task);
+ // its ok to submit the same request again because the RM will not give us
+ // the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted
+ // and so its better to give the RM the full information.
+ allocateTask(task, request.getCapability(),
+ (request.getNodes() == null ? null :
+ request.getNodes().toArray(new String[request.getNodes().size()])),
+ (request.getRacks() == null ? null :
+ request.getRacks().toArray(new String[request.getRacks().size()])),
+ request.getPriority(),
+ request.getCookie().getContainerSignature(),
+ request.getCookie().getAppCookie());
+ } else {
+ informAppAboutAssignment(entry.getKey(), container);
+ }
+ }
+ }
+
+ private abstract class ContainerAssigner {
+
+ protected final String locality;
+
+ protected ContainerAssigner(String locality) {
+ this.locality = locality;
+ }
+
+ public abstract CookieContainerRequest assignNewContainer(
+ Container container);
+
+ public abstract CookieContainerRequest assignReUsedContainer(
+ Container container, boolean honorLocality);
+
+ public void doBookKeepingForAssignedContainer(
+ CookieContainerRequest assigned, Container container,
+ String matchedLocation, boolean honorLocalityFlags) {
+ if (assigned == null) {
+ return;
+ }
+ Object task = getTask(assigned);
+ assert task != null;
+
+ LOG.info("Assigning container to task"
+ + ", container=" + container
+ + ", task=" + task
+ + ", containerHost=" + container.getNodeId().getHost()
+ + ", localityMatchType=" + locality
+ + ", matchedLocation=" + matchedLocation
+ + ", honorLocalityFlags=" + honorLocalityFlags
+ + ", reusedContainer="
+ + containerAssignments.containsKey(container.getId())
+ + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
+ + ", containerResourceMemory=" + container.getResource().getMemory()
+ + ", containerResourceVCores="
+ + container.getResource().getVirtualCores());
+
+ assignContainer(task, container, assigned);
+ }
+ }
+
+ private class NodeLocalContainerAssigner extends ContainerAssigner {
+
+ NodeLocalContainerAssigner() {
+ super("NodeLocal");
+ }
+
+ @Override
+ public CookieContainerRequest assignNewContainer(Container container) {
+ String location = container.getNodeId().getHost();
+ CookieContainerRequest assigned = getMatchingRequestWithPriority(
+ container, location);
+ doBookKeepingForAssignedContainer(assigned, container, location, false);
+ return assigned;
+ }
+
+ @Override
+ public CookieContainerRequest assignReUsedContainer(Container container,
+ boolean honorLocality) {
+ String location = container.getNodeId().getHost();
+ CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
+ container, location, true);
+ doBookKeepingForAssignedContainer(assigned, container, location, true);
+ return assigned;
+
+ }
+ }
+
+ private class RackLocalContainerAssigner extends ContainerAssigner {
+
+ RackLocalContainerAssigner() {
+ super("RackLocal");
+ }
+
+ @Override
+ public CookieContainerRequest assignNewContainer(Container container) {
+ String location = RackResolver.resolve(container.getNodeId().getHost())
+ .getNetworkLocation();
+ CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
+ location);
+ doBookKeepingForAssignedContainer(assigned, container, location, false);
+ return assigned;
+ }
+
+ @Override
+ public CookieContainerRequest assignReUsedContainer(
+ Container container, boolean honorLocality) {
+ // TEZ-586 this is not match an actual rackLocal request unless honorLocality
+ // is false. This method is useless if honorLocality=true
+ if (!honorLocality) {
+ String location = RackResolver.resolve(container.getNodeId().getHost())
+ .getNetworkLocation();
+ CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
+ container, location, false);
+ doBookKeepingForAssignedContainer(assigned, container, location,
+ honorLocality);
+ return assigned;
+ }
+ return null;
+ }
+ }
+
+ private class NonLocalContainerAssigner extends ContainerAssigner {
+
+ NonLocalContainerAssigner() {
+ super("NonLocal");
+ }
+
+ @Override
+ public CookieContainerRequest assignNewContainer(Container container) {
+ String location = ResourceRequest.ANY;
+ CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
+ location);
+ doBookKeepingForAssignedContainer(assigned, container, location, false);
+ return assigned;
+ }
+
+ @Override
+ public CookieContainerRequest assignReUsedContainer(Container container,
+ boolean honorLocality) {
+ if (!honorLocality) {
+ String location = ResourceRequest.ANY;
+ CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
+ container, location, false);
+ doBookKeepingForAssignedContainer(assigned, container, location,
+ honorLocality);
+ return assigned;
+ }
+ return null;
+ }
+
+ }
+
+
+ @VisibleForTesting
+ class DelayedContainerManager extends Thread {
+
+ class HeldContainerTimerComparator implements Comparator<HeldContainer> {
+
+ @Override
+ public int compare(HeldContainer c1,
+ HeldContainer c2) {
+ return (int) (c1.getNextScheduleTime() - c2.getNextScheduleTime());
+ }
+ }
+
+ PriorityBlockingQueue<HeldContainer> delayedContainers =
+ new PriorityBlockingQueue<HeldContainer>(20,
+ new HeldContainerTimerComparator());
+
+ private volatile boolean tryAssigningAll = false;
+ private volatile boolean running = true;
+ private long maxScheduleTimeSeen = -1;
+
+ // used for testing only
+ @VisibleForTesting
+ volatile AtomicBoolean drainedDelayedContainersForTest = null;
+
+ DelayedContainerManager() {
+ super.setName("DelayedContainerManager");
+ }
+
+ @Override
+ public void run() {
+ while(running) {
+ // Try assigning all containers if there's a request to do so.
+ if (tryAssigningAll) {
+ doAssignAll();
+ tryAssigningAll = false;
+ }
+
+ // Try allocating containers which have timed out.
+ // Required since these containers may get assigned without
+ // locality at this point.
+ if (delayedContainers.peek() == null) {
+ try {
+ // test only signaling to make TestTaskScheduler work
+ if (drainedDelayedContainersForTest != null) {
+ drainedDelayedContainersForTest.set(true);
+ synchronized (drainedDelayedContainersForTest) {
+ drainedDelayedContainersForTest.notifyAll();
+ }
+ }
+ synchronized(this) {
+ this.wait();
+ }
+ // Re-loop to see if tryAssignAll is set.
+ continue;
+ } catch (InterruptedException e) {
+ LOG.info("AllocatedContainerManager Thread interrupted");
+ }
+ } else {
+ // test only sleep to prevent tight loop cycling that makes tests stall
+ if (drainedDelayedContainersForTest != null) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ HeldContainer delayedContainer = delayedContainers.peek();
+ if (delayedContainer == null) {
+ continue;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Considering HeldContainer: "
+ + delayedContainer + " for assignment");
+ }
+ long currentTs = System.currentTimeMillis();
+ long nextScheduleTs = delayedContainer.getNextScheduleTime();
+ if (currentTs >= nextScheduleTs) {
+ // Remove the container and try scheduling it.
+ // TEZ-587 what if container is released by RM after this
+ // in onContainerCompleted()
+ delayedContainer = delayedContainers.poll();
+ if (delayedContainer == null) {
+ continue;
+ }
+ Map<CookieContainerRequest, Container> assignedContainers = null;
+ synchronized(YarnTaskSchedulerService.this) {
+ if (null !=
+ heldContainers.get(delayedContainer.getContainer().getId())) {
+ assignedContainers = assignDelayedContainer(delayedContainer);
+ } else {
+ LOG.info("Skipping delayed container as container is no longer"
+ + " running, containerId="
+ + delayedContainer.getContainer().getId());
+ }
+ }
+ // Inform App should be done outside of the lock
+ informAppAboutAssignments(assignedContainers);
+ } else {
+ synchronized(this) {
+ try {
+ // Wait for the next container to be assignable
+ delayedContainer = delayedContainers.peek();
+ long diff = localitySchedulingDelay;
+ if (delayedContainer != null) {
+ diff = delayedContainer.getNextScheduleTime() - currentTs;
+ }
+ if (diff > 0) {
+ this.wait(diff);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("AllocatedContainerManager Thread interrupted");
+ }
+ }
+ }
+ }
+ }
+ releasePendingContainers();
+ }
+
+ private void doAssignAll() {
+ // The allocatedContainers queue should not be modified in the middle of an iteration over it.
+ // Synchronizing here on TaskScheduler.this to prevent this from happening.
+ // The call to assignAll from within this method should NOT add any
+ // elements back to the allocatedContainers list. Since they're all
+ // delayed elements, de-allocation should not happen either - leaving the
+ // list of delayed containers intact, except for the contaienrs which end
+ // up getting assigned.
+ if (delayedContainers.isEmpty()) {
+ return;
+ }
+
+ Map<CookieContainerRequest, Container> assignedContainers;
+ synchronized(YarnTaskSchedulerService.this) {
+ // honor reuse-locality flags (container not timed out yet), Don't queue
+ // (already in queue), don't release (release happens when containers
+ // time-out)
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to assign all delayed containers to newly received"
+ + " tasks");
+ }
+ Iterator<HeldContainer> iter = delayedContainers.iterator();
+ while(iter.hasNext()) {
+ HeldContainer delayedContainer = iter.next();
+ if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
+ // this container is no longer held by us
+ LOG.info("AssignAll - Skipping delayed container as container is no longer"
+ + " running, containerId="
+ + delayedContainer.getContainer().getId());
+ iter.remove();
+ }
+ }
+ assignedContainers = tryAssignReUsedContainers(
+ new ContainerIterable(delayedContainers));
+ }
+ // Inform app
+ informAppAboutAssignments(assignedContainers);
+ }
+
+ /**
+ * Indicate that an attempt should be made to allocate all available containers.
+ * Intended to be used in cases where new Container requests come in
+ */
+ public void triggerScheduling(boolean scheduleAll) {
+ this.tryAssigningAll = scheduleAll;
+ synchronized(this) {
+ this.notify();
+ }
+ }
+
+ public void shutdown() {
+ this.running = false;
+ this.interrupt();
+ }
+
+ private void releasePendingContainers() {
+ List<HeldContainer> pendingContainers = Lists.newArrayListWithCapacity(
+ delayedContainers.size());
+ delayedContainers.drainTo(pendingContainers);
+ releaseUnassignedContainers(new ContainerIterable(pendingContainers));
+ }
+
+ private void addDelayedContainer(Container container,
+ long nextScheduleTime) {
+ HeldContainer delayedContainer = heldContainers.get(container.getId());
+ if (delayedContainer == null) {
+ LOG.warn("Attempting to add a non-running container to the"
+ + " delayed container list, containerId=" + container.getId());
+ return;
+ } else {
+ delayedContainer.setNextScheduleTime(nextScheduleTime);
+ }
+ if (maxScheduleTimeSeen < nextScheduleTime) {
+ maxScheduleTimeSeen = nextScheduleTime;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding container to delayed queue"
+ + ", containerId=" + delayedContainer.getContainer().getId()
+ + ", nextScheduleTime=" + delayedContainer.getNextScheduleTime()
+ + ", containerExpiry=" + delayedContainer.getContainerExpiryTime());
+ }
+ boolean added = delayedContainers.offer(delayedContainer);
+ synchronized(this) {
+ this.notify();
+ }
+ if (!added) {
+ releaseUnassignedContainers(Lists.newArrayList(container));
+ }
+ }
+
+ }
+
+ private class ContainerIterable implements Iterable<Container> {
+
+ private final Iterable<HeldContainer> delayedContainers;
+
+ ContainerIterable(Iterable<HeldContainer> delayedContainers) {
+ this.delayedContainers = delayedContainers;
+ }
+
+ @Override
+ public Iterator<Container> iterator() {
+
+ final Iterator<HeldContainer> delayedContainerIterator = delayedContainers
+ .iterator();
+
+ return new Iterator<Container>() {
+
+ @Override
+ public boolean hasNext() {
+ return delayedContainerIterator.hasNext();
+ }
+
+ @Override
+ public Container next() {
+ return delayedContainerIterator.next().getContainer();
+ }
+
+ @Override
+ public void remove() {
+ delayedContainerIterator.remove();
+ }
+ };
+ }
+ }
+
+ static class HeldContainer {
+
+ enum LocalityMatchLevel {
+ NEW,
+ NODE,
+ RACK,
+ NON_LOCAL
+ }
+
+ Container container;
+ private long nextScheduleTime;
+ private Object firstContainerSignature;
+ private LocalityMatchLevel localityMatchLevel;
+ private long containerExpiryTime;
+ private CookieContainerRequest lastTaskInfo;
+ private int numAssignmentAttempts = 0;
+
+ HeldContainer(Container container,
+ long nextScheduleTime,
+ long containerExpiryTime,
+ CookieContainerRequest firstTaskInfo) {
+ this.container = container;
+ this.nextScheduleTime = nextScheduleTime;
+ if (firstTaskInfo != null) {
+ this.lastTaskInfo = firstTaskInfo;
+ this.firstContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
+ }
+ this.localityMatchLevel = LocalityMatchLevel.NODE;
+ this.containerExpiryTime = containerExpiryTime;
+ }
+
+ boolean isNew() {
+ return firstContainerSignature == null;
+ }
+
+ int geNumAssignmentAttempts() {
+ return numAssignmentAttempts;
+ }
+
+ void incrementAssignmentAttempts() {
+ numAssignmentAttempts++;
+ }
+
+ public Container getContainer() {
+ return this.container;
+ }
+
+ public long getNextScheduleTime() {
+ return this.nextScheduleTime;
+ }
+
+ public void setNextScheduleTime(long nextScheduleTime) {
+ this.nextScheduleTime = nextScheduleTime;
+ }
+
+ public long getContainerExpiryTime() {
+ return this.containerExpiryTime;
+ }
+
+ public void setContainerExpiryTime(long containerExpiryTime) {
+ this.containerExpiryTime = containerExpiryTime;
+ }
+
+ public Object getFirstContainerSignature() {
+ return this.firstContainerSignature;
+ }
+
+ public CookieContainerRequest getLastTaskInfo() {
+ return this.lastTaskInfo;
+ }
+
+ public void setLastTaskInfo(CookieContainerRequest taskInfo) {
+ lastTaskInfo = taskInfo;
+ }
+
+ public synchronized void resetLocalityMatchLevel() {
+ localityMatchLevel = LocalityMatchLevel.NEW;
+ }
+
+ public synchronized void incrementLocalityMatchLevel() {
+ if (localityMatchLevel.equals(LocalityMatchLevel.NEW)) {
+ localityMatchLevel = LocalityMatchLevel.NODE;
+ } else if (localityMatchLevel.equals(LocalityMatchLevel.NODE)) {
+ localityMatchLevel = LocalityMatchLevel.RACK;
+ } else if (localityMatchLevel.equals(LocalityMatchLevel.RACK)) {
+ localityMatchLevel = LocalityMatchLevel.NON_LOCAL;
+ } else if (localityMatchLevel.equals(LocalityMatchLevel.NON_LOCAL)) {
+ throw new TezUncheckedException("Cannot increment locality level "
+ + " from current NON_LOCAL for container: " + container.getId());
+ }
+ }
+
+ public LocalityMatchLevel getLocalityMatchLevel() {
+ return this.localityMatchLevel;
+ }
+
+ @Override
+ public String toString() {
+ return "HeldContainer: id: " + container.getId()
+ + ", nextScheduleTime: " + nextScheduleTime
+ + ", localityMatchLevel=" + localityMatchLevel
+ + ", signature: "
+ + (firstContainerSignature != null? firstContainerSignature.toString():"null");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 307af71..c5d20d7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -66,9 +66,9 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
index 1fb393f..c6e83c0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
@@ -34,10 +34,10 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.rm.LocalTaskScheduler.AsyncDelegateRequestHandler;
-import org.apache.tez.dag.app.rm.LocalTaskScheduler.LocalContainerFactory;
-import org.apache.tez.dag.app.rm.LocalTaskScheduler.TaskRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.AsyncDelegateRequestHandler;
+import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.LocalContainerFactory;
+import org.apache.tez.dag.app.rm.LocalTaskSchedulerService.TaskRequest;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
public class TestLocalTaskScheduler {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 9652da4..2e94dc0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -64,10 +64,10 @@ import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.rm.TaskScheduler.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.TaskScheduler.HeldContainer;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback.AppFinalStatus;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
@@ -1311,9 +1311,9 @@ public class TestTaskScheduler {
matchingMap.put(hostsTask1[0], host1List);
matchingMap.put(defaultRack[0], defaultRackList);
- List<CookieContainerRequest> nonAllocatedHostList = new ArrayList<TaskScheduler.CookieContainerRequest>();
+ List<CookieContainerRequest> nonAllocatedHostList = new ArrayList<YarnTaskSchedulerService.CookieContainerRequest>();
nonAllocatedHostList.add(mockCookie2);
- List<CookieContainerRequest> otherRackList = new ArrayList<TaskScheduler.CookieContainerRequest>();
+ List<CookieContainerRequest> otherRackList = new ArrayList<YarnTaskSchedulerService.CookieContainerRequest>();
otherRackList.add(mockCookie2);
taskScheduler.allocateTask(mockTask2, resource, hostsTask2, otherRack,
priority, null, mockCookie2);
@@ -1321,7 +1321,7 @@ public class TestTaskScheduler {
matchingMap.put(hostsTask2[0], nonAllocatedHostList);
matchingMap.put(otherRack[0], otherRackList);
- List<CookieContainerRequest> anyList = new LinkedList<TaskScheduler.CookieContainerRequest>();
+ List<CookieContainerRequest> anyList = new LinkedList<YarnTaskSchedulerService.CookieContainerRequest>();
anyList.add(mockCookie1);
anyList.add(mockCookie2);
[3/4] TEZ-1218. Make TaskScheduler an Abstract class instead of an
Inteface. Contributed by Jeff Zhang.
Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
deleted file mode 100644
index 67369ee..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ /dev/null
@@ -1,1985 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
-import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/* TODO not yet updating cluster nodes on every allocate response
- * from RMContainerRequestor
- import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
- if (clusterNmCount != lastClusterNmCount) {
- LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
- + clusterNmCount);
- eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
- }
- */
-public class TaskScheduler extends AbstractService
- implements AMRMClientAsync.CallbackHandler, TaskSchedulerInterface {
- private static final Log LOG = LogFactory.getLog(TaskScheduler.class);
-
- public interface TaskSchedulerAppCallback {
- public class AppFinalStatus {
- public final FinalApplicationStatus exitStatus;
- public final String exitMessage;
- public final String postCompletionTrackingUrl;
- public AppFinalStatus(FinalApplicationStatus exitStatus,
- String exitMessage,
- String posCompletionTrackingUrl) {
- this.exitStatus = exitStatus;
- this.exitMessage = exitMessage;
- this.postCompletionTrackingUrl = posCompletionTrackingUrl;
- }
- }
- // upcall to app must be outside locks
- public void taskAllocated(Object task,
- Object appCookie,
- Container container);
- // this may end up being called for a task+container pair that the app
- // has not heard about. this can happen because of a race between
- // taskAllocated() upcall and deallocateTask() downcall
- public void containerCompleted(Object taskLastAllocated,
- ContainerStatus containerStatus);
- public void containerBeingReleased(ContainerId containerId);
- public void nodesUpdated(List<NodeReport> updatedNodes);
- public void appShutdownRequested();
- public void setApplicationRegistrationData(
- Resource maxContainerCapability,
- Map<ApplicationAccessType, String> appAcls,
- ByteBuffer clientAMSecretKey
- );
- public void onError(Throwable t);
- public float getProgress();
- public void preemptContainer(ContainerId containerId);
- public AppFinalStatus getFinalAppStatus();
- }
-
- final TezAMRMClientAsync<CookieContainerRequest> amRmClient;
- final TaskSchedulerAppCallback realAppClient;
- final TaskSchedulerAppCallback appClientDelegate;
- final ContainerSignatureMatcher containerSignatureMatcher;
- ExecutorService appCallbackExecutor;
-
- // Container Re-Use configuration
- private boolean shouldReuseContainers;
- private boolean reuseRackLocal;
- private boolean reuseNonLocal;
-
- Map<Object, CookieContainerRequest> taskRequests =
- new HashMap<Object, CookieContainerRequest>();
- // LinkedHashMap is need in getProgress()
- LinkedHashMap<Object, Container> taskAllocations =
- new LinkedHashMap<Object, Container>();
- /**
- * Tracks last task assigned to a known container.
- */
- Map<ContainerId, Object> containerAssignments =
- new HashMap<ContainerId, Object>();
- // Remove inUse depending on resolution of TEZ-1129
- Set<ContainerId> inUseContainers = Sets.newHashSet();
- HashMap<ContainerId, Object> releasedContainers =
- new HashMap<ContainerId, Object>();
- /**
- * Map of containers currently being held by the TaskScheduler.
- */
- Map<ContainerId, HeldContainer> heldContainers =
- new HashMap<ContainerId, HeldContainer>();
-
- Set<Priority> priorityHasAffinity = Sets.newHashSet();
-
- Set<NodeId> blacklistedNodes = Collections
- .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
-
- Resource totalResources = Resource.newInstance(0, 0);
- Resource allocatedResources = Resource.newInstance(0, 0);
-
- final String appHostName;
- final int appHostPort;
- final String appTrackingUrl;
- final AppContext appContext;
-
- AtomicBoolean isStopped = new AtomicBoolean(false);
-
- private ContainerAssigner NODE_LOCAL_ASSIGNER = new NodeLocalContainerAssigner();
- private ContainerAssigner RACK_LOCAL_ASSIGNER = new RackLocalContainerAssigner();
- private ContainerAssigner NON_LOCAL_ASSIGNER = new NonLocalContainerAssigner();
-
- DelayedContainerManager delayedContainerManager;
- long localitySchedulingDelay;
- long sessionDelay;
-
- @VisibleForTesting
- protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
-
- class CRCookie {
- // Do not use these variables directly. Can caused mocked unit tests to fail.
- private Object task;
- private Object appCookie;
- private Object containerSignature;
-
- CRCookie(Object task, Object appCookie, Object containerSignature) {
- this.task = task;
- this.appCookie = appCookie;
- this.containerSignature = containerSignature;
- }
-
- Object getTask() {
- return task;
- }
-
- Object getAppCookie() {
- return appCookie;
- }
-
- Object getContainerSignature() {
- return containerSignature;
- }
- }
-
- class CookieContainerRequest extends ContainerRequest {
- CRCookie cookie;
- ContainerId affinitizedContainerId;
-
- public CookieContainerRequest(
- Resource capability,
- String[] hosts,
- String[] racks,
- Priority priority,
- CRCookie cookie) {
- super(capability, hosts, racks, priority);
- this.cookie = cookie;
- }
-
- public CookieContainerRequest(
- Resource capability,
- ContainerId containerId,
- String[] hosts,
- String[] racks,
- Priority priority,
- CRCookie cookie) {
- this(capability, hosts, racks, priority, cookie);
- this.affinitizedContainerId = containerId;
- }
-
- CRCookie getCookie() {
- return cookie;
- }
-
- ContainerId getAffinitizedContainer() {
- return affinitizedContainerId;
- }
- }
-
- public TaskScheduler(TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher,
- String appHostName,
- int appHostPort,
- String appTrackingUrl,
- AppContext appContext) {
- super(TaskScheduler.class.getName());
- this.realAppClient = appClient;
- this.appCallbackExecutor = createAppCallbackExecutorService();
- this.containerSignatureMatcher = containerSignatureMatcher;
- this.appClientDelegate = createAppCallbackDelegate(appClient);
- this.amRmClient = TezAMRMClientAsync.createAMRMClientAsync(1000, this);
- this.appHostName = appHostName;
- this.appHostPort = appHostPort;
- this.appTrackingUrl = appTrackingUrl;
- this.appContext = appContext;
- }
-
- @Private
- @VisibleForTesting
- TaskScheduler(TaskSchedulerAppCallback appClient,
- ContainerSignatureMatcher containerSignatureMatcher,
- String appHostName,
- int appHostPort,
- String appTrackingUrl,
- TezAMRMClientAsync<CookieContainerRequest> client,
- AppContext appContext) {
- super(TaskScheduler.class.getName());
- this.realAppClient = appClient;
- this.appCallbackExecutor = createAppCallbackExecutorService();
- this.containerSignatureMatcher = containerSignatureMatcher;
- this.appClientDelegate = createAppCallbackDelegate(appClient);
- this.amRmClient = client;
- this.appHostName = appHostName;
- this.appHostPort = appHostPort;
- this.appTrackingUrl = appTrackingUrl;
- this.appContext = appContext;
- }
-
- @VisibleForTesting
- ExecutorService createAppCallbackExecutorService() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
- }
-
- @Override
- public Resource getAvailableResources() {
- return amRmClient.getAvailableResources();
- }
-
- @Override
- public int getClusterNodeCount() {
- // this can potentially be cheaper after YARN-1722
- return amRmClient.getClusterNodeCount();
- }
-
- TaskSchedulerAppCallback createAppCallbackDelegate(
- TaskSchedulerAppCallback realAppClient) {
- return new TaskSchedulerAppCallbackWrapper(realAppClient,
- appCallbackExecutor);
- }
-
- @Override
- public void setShouldUnregister() {
- this.shouldUnregister.set(true);
- }
-
- // AbstractService methods
- @Override
- public synchronized void serviceInit(Configuration conf) {
-
- amRmClient.init(conf);
- int heartbeatIntervalMax = conf.getInt(
- TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX,
- TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT);
- amRmClient.setHeartbeatInterval(heartbeatIntervalMax);
-
- shouldReuseContainers = conf.getBoolean(
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED,
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT);
- reuseRackLocal = conf.getBoolean(
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED,
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT);
- reuseNonLocal = conf
- .getBoolean(
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED,
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT);
- Preconditions.checkArgument(
- ((!reuseRackLocal && !reuseNonLocal) || (reuseRackLocal)),
- "Re-use Rack-Local cannot be disabled if Re-use Non-Local has been"
- + " enabled");
-
- localitySchedulingDelay = conf.getLong(
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS,
- TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS_DEFAULT);
- Preconditions.checkArgument(localitySchedulingDelay >= 0,
- "Locality Scheduling delay should be >=0");
-
- sessionDelay = conf.getLong(
- TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS,
- TezConfiguration.TEZ_AM_CONTAINER_SESSION_DELAY_ALLOCATION_MILLIS_DEFAULT);
- Preconditions.checkArgument(sessionDelay >= 0 || sessionDelay == -1,
- "Session delay should be either -1 or >=0");
-
- delayedContainerManager = new DelayedContainerManager();
- LOG.info("TaskScheduler initialized with configuration: " +
- "maxRMHeartbeatInterval: " + heartbeatIntervalMax +
- ", containerReuseEnabled: " + shouldReuseContainers +
- ", reuseRackLocal: " + reuseRackLocal +
- ", reuseNonLocal: " + reuseNonLocal +
- ", localitySchedulingDelay: " + localitySchedulingDelay +
- ", sessionDelay=" + sessionDelay);
- }
-
- @Override
- public void serviceStart() {
- try {
- RegisterApplicationMasterResponse response;
- synchronized (this) {
- amRmClient.start();
- response = amRmClient.registerApplicationMaster(appHostName,
- appHostPort,
- appTrackingUrl);
- }
- // upcall to app outside locks
- appClientDelegate.setApplicationRegistrationData(
- response.getMaximumResourceCapability(),
- response.getApplicationACLs(),
- response.getClientToAMTokenMasterKey());
-
- delayedContainerManager.start();
- } catch (YarnException e) {
- LOG.error("Yarn Exception while registering", e);
- throw new TezUncheckedException(e);
- } catch (IOException e) {
- LOG.error("IO Exception while registering", e);
- throw new TezUncheckedException(e);
- }
- }
-
- @Override
- public void serviceStop() throws InterruptedException {
- // upcall to app outside of locks
- try {
- delayedContainerManager.shutdown();
- // Wait for contianers to be released.
- delayedContainerManager.join(2000l);
- synchronized (this) {
- isStopped.set(true);
- if (shouldUnregister.get()) {
- AppFinalStatus status = appClientDelegate.getFinalAppStatus();
- LOG.info("Unregistering application from RM"
- + ", exitStatus=" + status.exitStatus
- + ", exitMessage=" + status.exitMessage
- + ", trackingURL=" + status.postCompletionTrackingUrl);
- amRmClient.unregisterApplicationMaster(status.exitStatus,
- status.exitMessage,
- status.postCompletionTrackingUrl);
- }
- }
-
- // call client.stop() without lock client will attempt to stop the callback
- // operation and at the same time the callback operation might be trying
- // to get our lock.
- amRmClient.stop();
- appCallbackExecutor.shutdown();
- appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
- } catch (YarnException e) {
- LOG.error("Yarn Exception while unregistering ", e);
- throw new TezUncheckedException(e);
- } catch (IOException e) {
- LOG.error("IOException while unregistering ", e);
- throw new TezUncheckedException(e);
- }
- }
-
- // AMRMClientAsync interface methods
- @Override
- public void onContainersCompleted(List<ContainerStatus> statuses) {
- if (isStopped.get()) {
- return;
- }
- Map<Object, ContainerStatus> appContainerStatus =
- new HashMap<Object, ContainerStatus>(statuses.size());
- synchronized (this) {
- for(ContainerStatus containerStatus : statuses) {
- ContainerId completedId = containerStatus.getContainerId();
- HeldContainer delayedContainer = heldContainers.get(completedId);
-
- Object task = releasedContainers.remove(completedId);
- if(task != null){
- if (delayedContainer != null) {
- LOG.warn("Held container should be null since releasedContainer is not");
- }
- // TODO later we may want to check if exit code matched expectation
- // e.g. successful container should not come back fail exit code after
- // being released
- // completion of a container we had released earlier
- // an allocated container completed. notify app
- LOG.info("Released container completed:" + completedId +
- " last allocated to task: " + task);
- appContainerStatus.put(task, containerStatus);
- continue;
- }
-
- // not found in released containers. check currently allocated containers
- // no need to release this container as the RM has already completed it
- task = unAssignContainer(completedId, false);
- if (delayedContainer != null) {
- heldContainers.remove(completedId);
- Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource());
- } else {
- LOG.warn("Held container expected to be not null for a non-AM-released container");
- }
- if(task != null) {
- // completion of a container we have allocated currently
- // an allocated container completed. notify app
- LOG.info("Allocated container completed:" + completedId +
- " last allocated to task: " + task);
- appContainerStatus.put(task, containerStatus);
- continue;
- }
-
- // container neither allocated nor released
- LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
- }
- }
-
- // upcall to app must be outside locks
- for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
- appClientDelegate.containerCompleted(entry.getKey(), entry.getValue());
- }
- }
-
- @Override
- public void onContainersAllocated(List<Container> containers) {
- if (isStopped.get()) {
- return;
- }
- Map<CookieContainerRequest, Container> assignedContainers;
-
- if (LOG.isDebugEnabled()) {
- StringBuilder sb = new StringBuilder();
- for (Container container: containers) {
- sb.append(container.getId()).append(", ");
- }
- LOG.debug("Assigned New Containers: " + sb.toString());
- }
-
- synchronized (this) {
- if (!shouldReuseContainers) {
- List<Container> modifiableContainerList = Lists.newLinkedList(containers);
- assignedContainers = assignNewlyAllocatedContainers(
- modifiableContainerList);
- } else {
- // unify allocations
- pushNewContainerToDelayed(containers);
- return;
- }
- }
-
- // upcall to app must be outside locks
- informAppAboutAssignments(assignedContainers);
- }
-
- /**
- * Tries assigning the list of specified containers. Optionally, release
- * containers or add them to the delayed container queue.
- *
- * The flags apply to all containers in the specified lists. So, separate
- * calls should be made based on the expected behaviour.
- *
- * @param containers
- * The list of containers to be assigned. The list *may* be modified
- * in place based on allocations and releases.
- * @return Assignments.
- */
- private synchronized Map<CookieContainerRequest, Container>
- assignNewlyAllocatedContainers(Iterable<Container> containers) {
-
- Map<CookieContainerRequest, Container> assignedContainers =
- new HashMap<CookieContainerRequest, Container>();
- assignNewContainersWithLocation(containers,
- NODE_LOCAL_ASSIGNER, assignedContainers);
- assignNewContainersWithLocation(containers,
- RACK_LOCAL_ASSIGNER, assignedContainers);
- assignNewContainersWithLocation(containers,
- NON_LOCAL_ASSIGNER, assignedContainers);
-
- // Release any unassigned containers given by the RM
- releaseUnassignedContainers(containers);
-
- return assignedContainers;
- }
-
- private synchronized Map<CookieContainerRequest, Container>
- tryAssignReUsedContainers(Iterable<Container> containers) {
-
- Map<CookieContainerRequest, Container> assignedContainers =
- new HashMap<CookieContainerRequest, Container>();
-
- // Honor locality and match as many as possible
- assignReUsedContainersWithLocation(containers,
- NODE_LOCAL_ASSIGNER, assignedContainers, true);
- assignReUsedContainersWithLocation(containers,
- RACK_LOCAL_ASSIGNER, assignedContainers, true);
- assignReUsedContainersWithLocation(containers,
- NON_LOCAL_ASSIGNER, assignedContainers, true);
-
- return assignedContainers;
- }
-
- /**
- * Try to assign a re-used container
- * @param heldContainer Container to be used to assign to tasks
- * @return Assigned container map
- */
-
- private synchronized Map<CookieContainerRequest, Container>
- assignDelayedContainer(HeldContainer heldContainer) {
-
- DAGAppMasterState state = appContext.getAMState();
- boolean isNew = heldContainer.isNew();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to assign a delayed container"
- + ", containerId=" + heldContainer.getContainer().getId()
- + ", nextScheduleTime=" + heldContainer.getNextScheduleTime()
- + ", containerExpiryTime=" + heldContainer.getContainerExpiryTime()
- + ", AMState=" + state
- + ", matchLevel=" + heldContainer.getLocalityMatchLevel()
- + ", taskRequestsCount=" + taskRequests.size()
- + ", heldContainers=" + heldContainers.size()
- + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
- + ", isNew=" + isNew);
- }
-
- if (state.equals(DAGAppMasterState.IDLE) || taskRequests.isEmpty()) {
- // reset locality level on held container
- // if sessionDelay defined, push back into delayed queue if not already
- // done so
-
- heldContainer.resetLocalityMatchLevel();
- long currentTime = System.currentTimeMillis();
- if (isNew || (heldContainer.getContainerExpiryTime() <= currentTime
- && sessionDelay != -1)) {
- LOG.info("No taskRequests. Container's session delay expired or is new. " +
- "Releasing container"
- + ", containerId=" + heldContainer.container.getId()
- + ", containerExpiryTime="
- + heldContainer.getContainerExpiryTime()
- + ", sessionDelay=" + sessionDelay
- + ", taskRequestsCount=" + taskRequests.size()
- + ", heldContainers=" + heldContainers.size()
- + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
- + ", isNew=" + isNew);
- releaseUnassignedContainers(
- Lists.newArrayList(heldContainer.container));
- } else {
- if (!appContext.isSession()) {
- releaseUnassignedContainers(
- Lists.newArrayList(heldContainer.container));
- } else {
- // only put back in queue if this is a session
- heldContainer.resetLocalityMatchLevel();
- delayedContainerManager.addDelayedContainer(
- heldContainer.getContainer(),
- currentTime + localitySchedulingDelay);
- }
- }
- } else if (state.equals(DAGAppMasterState.RUNNING)) {
- HeldContainer.LocalityMatchLevel localityMatchLevel =
- heldContainer.getLocalityMatchLevel();
- Map<CookieContainerRequest, Container> assignedContainers =
- new HashMap<CookieContainerRequest, Container>();
-
- Container containerToAssign = heldContainer.container;
-
- heldContainer.incrementAssignmentAttempts();
- // Each time a container is seen, we try node, rack and non-local in that
- // order depending on matching level allowed
-
- // if match level is NEW or NODE, match only at node-local
- // always try node local matches for other levels
- if (isNew
- || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NEW)
- || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NODE)
- || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
- || localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.NON_LOCAL)) {
- assignReUsedContainerWithLocation(containerToAssign,
- NODE_LOCAL_ASSIGNER, assignedContainers, true);
- if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using node"
- + ", containerId=" + heldContainer.getContainer().getId());
- }
- }
-
- // if re-use allowed at rack
- // match against rack if match level is RACK or NON-LOCAL
- // if scheduling delay is 0, match at RACK allowed without a sleep
- if (assignedContainers.isEmpty()) {
- if ((reuseRackLocal || isNew) && (localitySchedulingDelay == 0 ||
- (localityMatchLevel.equals(HeldContainer.LocalityMatchLevel.RACK)
- || localityMatchLevel.equals(
- HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
- assignReUsedContainerWithLocation(containerToAssign,
- RACK_LOCAL_ASSIGNER, assignedContainers, false);
- if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using rack"
- + ", containerId=" + heldContainer.getContainer().getId());
- }
- }
- }
-
- // if re-use allowed at non-local
- // match against rack if match level is NON-LOCAL
- // if scheduling delay is 0, match at NON-LOCAL allowed without a sleep
- if (assignedContainers.isEmpty()) {
- if ((reuseNonLocal || isNew) && (localitySchedulingDelay == 0
- || localityMatchLevel.equals(
- HeldContainer.LocalityMatchLevel.NON_LOCAL))) {
- assignReUsedContainerWithLocation(containerToAssign,
- NON_LOCAL_ASSIGNER, assignedContainers, false);
- if (LOG.isDebugEnabled() && assignedContainers.isEmpty()) {
- LOG.info("Failed to assign tasks to delayed container using non-local"
- + ", containerId=" + heldContainer.getContainer().getId());
- }
- }
- }
-
- if (assignedContainers.isEmpty()) {
-
- long currentTime = System.currentTimeMillis();
-
- // Release container if final expiry time is reached
- // Dont release a new container. The RM may not give us new ones
- if (!isNew && heldContainer.getContainerExpiryTime() <= currentTime
- && sessionDelay != -1) {
- LOG.info("Container's session delay expired. Releasing container"
- + ", containerId=" + heldContainer.container.getId()
- + ", containerExpiryTime="
- + heldContainer.getContainerExpiryTime()
- + ", sessionDelay=" + sessionDelay);
- releaseUnassignedContainers(
- Lists.newArrayList(heldContainer.container));
- } else {
-
- // Let's decide if this container has hit the end of the road
-
- // EOL true if container's match level is NON-LOCAL
- boolean hitFinalMatchLevel = localityMatchLevel.equals(
- HeldContainer.LocalityMatchLevel.NON_LOCAL);
- if (!hitFinalMatchLevel) {
- // EOL also true if locality delay is 0
- // or rack-local or non-local is disabled
- heldContainer.incrementLocalityMatchLevel();
- if (localitySchedulingDelay == 0 ||
- (!reuseRackLocal
- || (!reuseNonLocal &&
- heldContainer.getLocalityMatchLevel().equals(
- HeldContainer.LocalityMatchLevel.NON_LOCAL)))) {
- hitFinalMatchLevel = true;
- }
- // the above if-stmt does not apply to new containers since they will
- // be matched at all locality levels. So there finalMatchLevel cannot
- // be short-circuited
- if (localitySchedulingDelay > 0 && isNew) {
- hitFinalMatchLevel = false;
- }
- }
-
- if (hitFinalMatchLevel) {
- boolean safeToRelease = true;
- Priority topPendingPriority = amRmClient.getTopPriority();
- Priority containerPriority = heldContainer.container.getPriority();
- if (isNew && topPendingPriority != null &&
- containerPriority.compareTo(topPendingPriority) < 0) {
- // this container is of lower priority and given to us by the RM for
- // a task that will be matched after the current top priority. Keep
- // this container for those pending tasks since the RM is not going
- // to give this container to us again
- safeToRelease = false;
- }
-
- // Are there any pending requests at any priority?
- // release if there are tasks or this is not a session
- if (safeToRelease &&
- (!taskRequests.isEmpty() || !appContext.isSession())) {
- LOG.info("Releasing held container as either there are pending but "
- + " unmatched requests or this is not a session"
- + ", containerId=" + heldContainer.container.getId()
- + ", pendingTasks=" + !taskRequests.isEmpty()
- + ", isSession=" + appContext.isSession()
- + ". isNew=" + isNew);
- releaseUnassignedContainers(
- Lists.newArrayList(heldContainer.container));
- } else {
- // if no tasks, treat this like an idle session
- heldContainer.resetLocalityMatchLevel();
- delayedContainerManager.addDelayedContainer(
- heldContainer.getContainer(),
- currentTime + localitySchedulingDelay);
- }
- } else {
- // Schedule delay container to match at a later try
- delayedContainerManager.addDelayedContainer(
- heldContainer.getContainer(),
- currentTime + localitySchedulingDelay);
- }
- }
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("Delayed container assignment successful"
- + ", containerId=" + heldContainer.getContainer().getId());
- }
-
- return assignedContainers;
- } else {
- // ignore all other cases?
- LOG.warn("Received a request to assign re-used containers when AM was "
- + " in state: " + state + ". Ignoring request and releasing container"
- + ": " + heldContainer.getContainer().getId());
- releaseUnassignedContainers(Lists.newArrayList(heldContainer.container));
- }
-
- return null;
- }
-
- @Override
- public synchronized void resetMatchLocalityForAllHeldContainers() {
- for (HeldContainer heldContainer : heldContainers.values()) {
- heldContainer.resetLocalityMatchLevel();
- }
- synchronized(delayedContainerManager) {
- delayedContainerManager.notify();
- }
- }
-
- @Override
- public void onShutdownRequest() {
- if (isStopped.get()) {
- return;
- }
- // upcall to app must be outside locks
- appClientDelegate.appShutdownRequested();
- }
-
- @Override
- public void onNodesUpdated(List<NodeReport> updatedNodes) {
- if (isStopped.get()) {
- return;
- }
- // ignore bad nodes for now
- // upcall to app must be outside locks
- appClientDelegate.nodesUpdated(updatedNodes);
- }
-
- @Override
- public float getProgress() {
- if (isStopped.get()) {
- return 1;
- }
-
- if(totalResources.getMemory() == 0) {
- // assume this is the first allocate callback. nothing is allocated.
- // available resource = totalResource
- // TODO this will not handle dynamic changes in resources
- totalResources = Resources.clone(getAvailableResources());
- LOG.info("App total resource memory: " + totalResources.getMemory() +
- " cpu: " + totalResources.getVirtualCores() +
- " taskAllocations: " + taskAllocations.size());
- }
-
- preemptIfNeeded();
-
- return appClientDelegate.getProgress();
- }
-
- @Override
- public void onError(Throwable t) {
- if (isStopped.get()) {
- return;
- }
- appClientDelegate.onError(t);
- }
-
- @Override
- public Resource getTotalResources() {
- return totalResources;
- }
-
- @Override
- public synchronized void blacklistNode(NodeId nodeId) {
- LOG.info("Blacklisting node: " + nodeId);
- amRmClient.addNodeToBlacklist(nodeId);
- blacklistedNodes.add(nodeId);
- }
-
- @Override
- public synchronized void unblacklistNode(NodeId nodeId) {
- if (blacklistedNodes.remove(nodeId)) {
- LOG.info("UnBlacklisting node: " + nodeId);
- amRmClient.removeNodeFromBlacklist(nodeId);
- }
- }
-
- @Override
- public synchronized void allocateTask(
- Object task,
- Resource capability,
- String[] hosts,
- String[] racks,
- Priority priority,
- Object containerSignature,
- Object clientCookie) {
-
- // XXX Have ContainerContext implement an interface defined by TaskScheduler.
- // TODO check for nulls etc
- // TODO extra memory allocation
- CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
- CookieContainerRequest request = new CookieContainerRequest(
- capability, hosts, racks, priority, cookie);
-
- addRequestAndTrigger(task, request, hosts, racks);
- }
-
- @Override
- public synchronized void allocateTask(
- Object task,
- Resource capability,
- ContainerId containerId,
- Priority priority,
- Object containerSignature,
- Object clientCookie) {
-
- HeldContainer heldContainer = heldContainers.get(containerId);
- String[] hosts = null;
- String[] racks = null;
- if (heldContainer != null) {
- Container container = heldContainer.getContainer();
- if (canFit(capability, container.getResource())) {
- // just specify node and use YARN's soft locality constraint for the rest
- hosts = new String[1];
- hosts[0] = container.getNodeId().getHost();
- priorityHasAffinity.add(priority);
- } else {
- LOG.warn("Matching requested to container: " + containerId +
- " but requested capability: " + capability +
- " does not fit in container resource: " + container.getResource());
- }
- } else {
- LOG.warn("Matching requested to unknown container: " + containerId);
- }
-
- CRCookie cookie = new CRCookie(task, clientCookie, containerSignature);
- CookieContainerRequest request = new CookieContainerRequest(
- capability, containerId, hosts, racks, priority, cookie);
-
- addRequestAndTrigger(task, request, hosts, racks);
- }
-
- private void addRequestAndTrigger(Object task, CookieContainerRequest request,
- String[] hosts, String[] racks) {
- addTaskRequest(task, request);
- // See if any of the delayedContainers can be used for this task.
- delayedContainerManager.triggerScheduling(true);
- LOG.info("Allocation request for task: " + task +
- " with request: " + request +
- " host: " + ((hosts!=null&&hosts.length>0)?hosts[0]:"null") +
- " rack: " + ((racks!=null&&racks.length>0)?racks[0]:"null"));
- }
-
- /**
- * @param task
- * the task to de-allocate.
- * @param taskSucceeded
- * specify whether the task succeeded or failed.
- * @return true if a container is assigned to this task.
- */
- @Override
- public boolean deallocateTask(Object task, boolean taskSucceeded) {
- Map<CookieContainerRequest, Container> assignedContainers = null;
-
- synchronized (this) {
- CookieContainerRequest request = removeTaskRequest(task);
- if (request != null) {
- // task not allocated yet
- LOG.info("Deallocating task: " + task + " before allocation");
- return false;
- }
-
- // task request not present. Look in allocations
- Container container = doBookKeepingForTaskDeallocate(task);
- if (container == null) {
- // task neither requested nor allocated.
- LOG.info("Ignoring removal of unknown task: " + task);
- return false;
- } else {
- LOG.info("Deallocated task: " + task + " from container: "
- + container.getId());
-
- if (!taskSucceeded || !shouldReuseContainers) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Releasing container, containerId=" + container.getId()
- + ", taskSucceeded=" + taskSucceeded
- + ", reuseContainersFlag=" + shouldReuseContainers);
- }
- releaseContainer(container.getId());
- } else {
- // Don't attempt to delay containers if delay is 0.
- HeldContainer heldContainer = heldContainers.get(container.getId());
- if (heldContainer != null) {
- heldContainer.resetLocalityMatchLevel();
- long currentTime = System.currentTimeMillis();
- if (sessionDelay > 0) {
- heldContainer.setContainerExpiryTime(currentTime + sessionDelay);
- }
- assignedContainers = assignDelayedContainer(heldContainer);
- } else {
- LOG.info("Skipping container after task deallocate as container is"
- + " no longer running, containerId=" + container.getId());
- }
- }
- }
- }
-
- // up call outside of the lock.
- if (assignedContainers != null && assignedContainers.size() == 1) {
- informAppAboutAssignments(assignedContainers);
- }
- return true;
- }
-
- @Override
- public synchronized Object deallocateContainer(ContainerId containerId) {
- Object task = unAssignContainer(containerId, true);
- if(task != null) {
- LOG.info("Deallocated container: " + containerId +
- " from task: " + task);
- return task;
- }
-
- LOG.info("Ignoring dealloction of unknown container: " + containerId);
- return null;
- }
-
- boolean canFit(Resource arg0, Resource arg1) {
- int mem0 = arg0.getMemory();
- int mem1 = arg1.getMemory();
- int cpu0 = arg0.getVirtualCores();
- int cpu1 = arg1.getVirtualCores();
-
- if(mem0 <= mem1 && cpu0 <= cpu1) {
- return true;
- }
- return false;
- }
-
- void preemptIfNeeded() {
- ContainerId preemptedContainer = null;
- synchronized (this) {
- Resource freeResources = Resources.subtract(totalResources,
- allocatedResources);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
- " cpu:" + allocatedResources.getVirtualCores() +
- " delayedContainers: " + delayedContainerManager.delayedContainers.size());
- }
- assert freeResources.getMemory() >= 0;
-
- CookieContainerRequest highestPriRequest = null;
- for(CookieContainerRequest request : taskRequests.values()) {
- if(highestPriRequest == null) {
- highestPriRequest = request;
- } else if(isHigherPriority(request.getPriority(),
- highestPriRequest.getPriority())){
- highestPriRequest = request;
- }
- }
- if(highestPriRequest != null &&
- !fitsIn(highestPriRequest.getCapability(), freeResources)) {
- // highest priority request will not fit in existing free resources
- // free up some more
- // TODO this is subject to error wrt RM resource normalization
-
- // This request must have been considered for matching with all existing
- // containers when request was made.
- Container lowestPriNewContainer = null;
- // could not find anything to preempt. Check if we can release unused
- // containers
- for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
- if (!heldContainer.isNew()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reused container exists. Wait for assignment loop to release it. "
- + heldContainer.getContainer().getId());
- }
- return;
- }
- if (heldContainer.geNumAssignmentAttempts() < 3) {
- // we havent tried to assign this container at node/rack/ANY
- if (LOG.isDebugEnabled()) {
- LOG.debug("Brand new container. Wait for assignment loop to match it. "
- + heldContainer.getContainer().getId());
- }
- return;
- }
- Container container = heldContainer.getContainer();
- if (lowestPriNewContainer == null ||
- isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
- // there is a lower priority new container
- lowestPriNewContainer = container;
- }
- }
-
- if (lowestPriNewContainer != null) {
- LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
- " with priority: " + lowestPriNewContainer.getPriority() +
- " to free resource for request: " + highestPriRequest +
- " . Current free resources: " + freeResources);
- releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
- // We are returning an unused resource back the RM. The RM thinks it
- // has serviced our initial request and will not re-allocate this back
- // to us anymore. So we need to ask for this again. If there is no
- // outstanding request at that priority then its fine to not ask again.
- // See TEZ-915 for more details
- for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
- Object task = entry.getKey();
- CookieContainerRequest request = entry.getValue();
- if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
- LOG.info("Resending request for task again: " + task);
- deallocateTask(task, true);
- allocateTask(task, request.getCapability(),
- (request.getNodes() == null ? null :
- request.getNodes().toArray(new String[request.getNodes().size()])),
- (request.getRacks() == null ? null :
- request.getRacks().toArray(new String[request.getRacks().size()])),
- request.getPriority(),
- request.getCookie().getContainerSignature(),
- request.getCookie().getAppCookie());
- break;
- }
- }
-
- return;
- }
-
- // this assert will be a no-op in production but can help identify
- // invalid assumptions during testing
- assert delayedContainerManager.delayedContainers.isEmpty();
-
- // there are no reused or new containers to release
- // try to preempt running containers
- Map.Entry<Object, Container> preemptedEntry = null;
- for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
- HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
- CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
- Priority taskPriority = lastTaskInfo.getPriority();
- Object signature = lastTaskInfo.getCookie().getContainerSignature();
- if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
- // higher or same priority
- continue;
- }
- if (containerSignatureMatcher.isExactMatch(
- highestPriRequest.getCookie().getContainerSignature(),
- signature)) {
- // exact match with different priorities
- continue;
- }
- if(preemptedEntry == null ||
- !isHigherPriority(taskPriority,
- preemptedEntry.getValue().getPriority())) {
- // keep the lower priority or the one added later
- preemptedEntry = entry;
- }
- }
- if(preemptedEntry != null) {
- // found something to preempt
- LOG.info("Preempting task: " + preemptedEntry.getKey() +
- " to free resource for request: " + highestPriRequest +
- " . Current free resources: " + freeResources);
- preemptedContainer = preemptedEntry.getValue().getId();
- // app client will be notified when after container is killed
- // and we get its completed container status
- }
- }
- }
-
- // upcall outside locks
- if (preemptedContainer != null) {
- appClientDelegate.preemptContainer(preemptedContainer);
- }
- }
-
- private boolean fitsIn(Resource toFit, Resource resource) {
- // YARN-893 prevents using correct library code
- //return Resources.fitsIn(toFit, resource);
- return resource.getMemory() >= toFit.getMemory();
- }
-
- private CookieContainerRequest getMatchingRequestWithPriority(
- Container container,
- String location) {
- Priority priority = container.getPriority();
- Resource capability = container.getResource();
- List<? extends Collection<CookieContainerRequest>> requestsList =
- amRmClient.getMatchingRequests(priority, location, capability);
-
- if (!requestsList.isEmpty()) {
- // pick first one
- for (Collection<CookieContainerRequest> requests : requestsList) {
- for (CookieContainerRequest cookieContainerRequest : requests) {
- if (canAssignTaskToContainer(cookieContainerRequest, container)) {
- return cookieContainerRequest;
- }
- }
- }
- }
-
- return null;
- }
-
- private CookieContainerRequest getMatchingRequestWithoutPriority(
- Container container,
- String location,
- boolean considerContainerAffinity) {
- Resource capability = container.getResource();
- List<? extends Collection<CookieContainerRequest>> pRequestsList =
- amRmClient.getMatchingRequestsForTopPriority(location, capability);
- if (considerContainerAffinity &&
- !priorityHasAffinity.contains(amRmClient.getTopPriority())) {
- considerContainerAffinity = false;
- }
- if (pRequestsList == null || pRequestsList.isEmpty()) {
- return null;
- }
- CookieContainerRequest firstMatch = null;
- for (Collection<CookieContainerRequest> requests : pRequestsList) {
- for (CookieContainerRequest cookieContainerRequest : requests) {
- if (firstMatch == null || // we dont have a match. So look for one
- // we have a match but are looking for a better container level match.
- // skip the expensive canAssignTaskToContainer() if the request is
- // not affinitized to the container
- container.getId().equals(cookieContainerRequest.getAffinitizedContainer())
- ) {
- if (canAssignTaskToContainer(cookieContainerRequest, container)) {
- // request matched to container
- if (!considerContainerAffinity) {
- return cookieContainerRequest;
- }
- ContainerId affCId = cookieContainerRequest.getAffinitizedContainer();
- boolean canMatchTaskWithAffinity = true;
- if (affCId == null ||
- !heldContainers.containsKey(affCId) ||
- inUseContainers.contains(affCId)) {
- // affinity not specified
- // affinitized container is no longer held
- // affinitized container is in use
- canMatchTaskWithAffinity = false;
- }
- if (canMatchTaskWithAffinity) {
- if (container.getId().equals(
- cookieContainerRequest.getAffinitizedContainer())) {
- // container level match
- if (LOG.isDebugEnabled()) {
- LOG.debug("Matching with affinity for request: "
- + cookieContainerRequest + " container: " + affCId);
- }
- return cookieContainerRequest;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping request for container " + container.getId()
- + " due to affinity. Request: " + cookieContainerRequest
- + " affContainer: " + affCId);
- }
- } else {
- firstMatch = cookieContainerRequest;
- }
- }
- }
- }
- }
-
- return firstMatch;
- }
-
- private boolean canAssignTaskToContainer(
- CookieContainerRequest cookieContainerRequest, Container container) {
- HeldContainer heldContainer = heldContainers.get(container.getId());
- if (heldContainer == null || heldContainer.isNew()) { // New container.
- return true;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to match task to a held container, "
- + " containerId=" + heldContainer.container.getId());
- }
- if (containerSignatureMatcher.isSuperSet(heldContainer
- .getFirstContainerSignature(), cookieContainerRequest.getCookie()
- .getContainerSignature())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Matched delayed container to task"
- + " containerId=" + heldContainer.container.getId());
- }
- return true;
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to match delayed container to task"
- + " containerId=" + heldContainer.container.getId());
- }
- return false;
- }
-
- private Object getTask(CookieContainerRequest request) {
- return request.getCookie().getTask();
- }
-
- private void releaseContainer(ContainerId containerId) {
- Object assignedTask = containerAssignments.remove(containerId);
- if (assignedTask != null) {
- // A task was assigned to this container at some point. Inform the app.
- appClientDelegate.containerBeingReleased(containerId);
- }
- HeldContainer delayedContainer = heldContainers.remove(containerId);
- if (delayedContainer != null) {
- Resources.subtractFrom(allocatedResources,
- delayedContainer.getContainer().getResource());
- }
- if (delayedContainer != null || !shouldReuseContainers) {
- amRmClient.releaseAssignedContainer(containerId);
- }
- if (assignedTask != null) {
- // A task was assigned at some point. Add to release list since we are
- // releasing the container.
- releasedContainers.put(containerId, assignedTask);
- }
- }
-
- private void assignContainer(Object task,
- Container container,
- CookieContainerRequest assigned) {
- CookieContainerRequest request = removeTaskRequest(task);
- assert request != null;
- //assert assigned.equals(request);
-
- Container result = taskAllocations.put(task, container);
- assert result == null;
- inUseContainers.add(container.getId());
- containerAssignments.put(container.getId(), task);
- HeldContainer heldContainer = heldContainers.get(container.getId());
- if (!shouldReuseContainers && heldContainer == null) {
- heldContainers.put(container.getId(), new HeldContainer(container,
- -1, -1, assigned));
- Resources.addTo(allocatedResources, container.getResource());
- } else {
- if (heldContainer.isNew()) {
- // check for existence before adding since the first container potentially
- // has the broadest signature as subsequent uses dont expand any dimension.
- // This will need to be enhanced to track other signatures too when we
- // think about preferring within vertex matching etc.
- heldContainers.put(container.getId(),
- new HeldContainer(container, heldContainer.getNextScheduleTime(),
- heldContainer.getContainerExpiryTime(), assigned));
- }
- heldContainer.setLastTaskInfo(assigned);
- }
- }
-
- private void pushNewContainerToDelayed(List<Container> containers){
- long expireTime = -1;
- if (sessionDelay > 0) {
- long currentTime = System.currentTimeMillis();
- expireTime = currentTime + sessionDelay;
- }
-
- synchronized (delayedContainerManager) {
- for (Container container : containers) {
- if (heldContainers.put(container.getId(), new HeldContainer(container,
- -1, expireTime, null)) != null) {
- throw new TezUncheckedException("New container " + container.getId()
- + " is already held.");
- }
- long nextScheduleTime = delayedContainerManager.maxScheduleTimeSeen;
- if (delayedContainerManager.maxScheduleTimeSeen == -1) {
- nextScheduleTime = System.currentTimeMillis();
- }
- Resources.addTo(allocatedResources, container.getResource());
- delayedContainerManager.addDelayedContainer(container,
- nextScheduleTime + 1);
- }
- }
- delayedContainerManager.triggerScheduling(false);
- }
-
- private CookieContainerRequest removeTaskRequest(Object task) {
- CookieContainerRequest request = taskRequests.remove(task);
- if(request != null) {
- // remove all references of the request from AMRMClient
- amRmClient.removeContainerRequest(request);
- }
- return request;
- }
-
- private void addTaskRequest(Object task,
- CookieContainerRequest request) {
- CookieContainerRequest oldRequest = taskRequests.put(task, request);
- if (oldRequest != null) {
- // remove all references of the request from AMRMClient
- amRmClient.removeContainerRequest(oldRequest);
- }
- amRmClient.addContainerRequest(request);
- }
-
- private Container doBookKeepingForTaskDeallocate(Object task) {
- Container container = taskAllocations.remove(task);
- if (container == null) {
- return null;
- }
- inUseContainers.remove(container.getId());
- return container;
- }
-
- private Object unAssignContainer(ContainerId containerId,
- boolean releaseIfFound) {
- // Not removing. containerAssignments tracks the last task run on a
- // container.
- Object task = containerAssignments.get(containerId);
- if(task == null) {
- return null;
- }
- Container container = taskAllocations.remove(task);
- assert container != null;
- inUseContainers.remove(containerId);
- if(releaseIfFound) {
- releaseContainer(containerId);
- }
- return task;
- }
-
- private boolean isHigherPriority(Priority lhs, Priority rhs) {
- return lhs.getPriority() < rhs.getPriority();
- }
-
- private synchronized void assignNewContainersWithLocation(
- Iterable<Container> containers,
- ContainerAssigner assigner,
- Map<CookieContainerRequest, Container> assignedContainers) {
-
- Iterator<Container> containerIterator = containers.iterator();
- while (containerIterator.hasNext()) {
- Container container = containerIterator.next();
- CookieContainerRequest assigned =
- assigner.assignNewContainer(container);
- if (assigned != null) {
- assignedContainers.put(assigned, container);
- containerIterator.remove();
- }
- }
- }
-
- private synchronized void assignReUsedContainersWithLocation(
- Iterable<Container> containers,
- ContainerAssigner assigner,
- Map<CookieContainerRequest, Container> assignedContainers,
- boolean honorLocality) {
-
- Iterator<Container> containerIterator = containers.iterator();
- while (containerIterator.hasNext()) {
- Container container = containerIterator.next();
- if (assignReUsedContainerWithLocation(container, assigner,
- assignedContainers, honorLocality)) {
- containerIterator.remove();
- }
- }
- }
-
- private synchronized boolean assignReUsedContainerWithLocation(
- Container container,
- ContainerAssigner assigner,
- Map<CookieContainerRequest, Container> assignedContainers,
- boolean honorLocality) {
-
- Priority containerPriority = container.getPriority();
- Priority topPendingTaskPriority = amRmClient.getTopPriority();
- if (topPendingTaskPriority == null) {
- // nothing left to assign
- return false;
- }
-
- if (topPendingTaskPriority.compareTo(containerPriority) > 0) {
- // if the next task to assign is higher priority than the container then
- // dont assign this container to that task.
- // if task and container are equal priority - then its first use or reuse
- // within the same priority - safe to use
- // if task is lower priority than container then its we use a container that
- // is no longer needed by higher priority tasks All those higher pri tasks
- // have been assigned resources - safe to use (first use or reuse)
- // if task is higher priority than container then we may end up using a
- // container that was assigned by the RM for a lower priority pending task
- // that will be assigned after this higher priority task is assigned. If we
- // use that task's container now then we may not be able to match this
- // container to that task later on. However the RM has already assigned us
- // all containers and is not going to give us new containers. We will get
- // stuck for resources.
- return false;
- }
-
- CookieContainerRequest assigned =
- assigner.assignReUsedContainer(container, honorLocality);
- if (assigned != null) {
- assignedContainers.put(assigned, container);
- return true;
- }
- return false;
- }
-
- private void releaseUnassignedContainers(Iterable<Container> containers) {
- for (Container container : containers) {
- LOG.info("Releasing unused container: "
- + container.getId());
- releaseContainer(container.getId());
- }
- }
-
- private void informAppAboutAssignment(CookieContainerRequest assigned,
- Container container) {
- appClientDelegate.taskAllocated(getTask(assigned),
- assigned.getCookie().getAppCookie(), container);
- }
-
- private void informAppAboutAssignments(
- Map<CookieContainerRequest, Container> assignedContainers) {
- if (assignedContainers == null || assignedContainers.isEmpty()) {
- return;
- }
- for (Entry<CookieContainerRequest, Container> entry : assignedContainers
- .entrySet()) {
- Container container = entry.getValue();
- // check for blacklisted nodes. There may be race conditions between
- // setting blacklist and receiving allocations
- if (blacklistedNodes.contains(container.getNodeId())) {
- CookieContainerRequest request = entry.getKey();
- Object task = getTask(request);
- LOG.info("Container: " + container.getId() +
- " allocated on blacklisted node: " + container.getNodeId() +
- " for task: " + task);
- Object deAllocTask = deallocateContainer(container.getId());
- assert deAllocTask.equals(task);
- // its ok to submit the same request again because the RM will not give us
- // the bad/unhealthy nodes again. The nodes may become healthy/unblacklisted
- // and so its better to give the RM the full information.
- allocateTask(task, request.getCapability(),
- (request.getNodes() == null ? null :
- request.getNodes().toArray(new String[request.getNodes().size()])),
- (request.getRacks() == null ? null :
- request.getRacks().toArray(new String[request.getRacks().size()])),
- request.getPriority(),
- request.getCookie().getContainerSignature(),
- request.getCookie().getAppCookie());
- } else {
- informAppAboutAssignment(entry.getKey(), container);
- }
- }
- }
-
- private abstract class ContainerAssigner {
-
- protected final String locality;
-
- protected ContainerAssigner(String locality) {
- this.locality = locality;
- }
-
- public abstract CookieContainerRequest assignNewContainer(
- Container container);
-
- public abstract CookieContainerRequest assignReUsedContainer(
- Container container, boolean honorLocality);
-
- public void doBookKeepingForAssignedContainer(
- CookieContainerRequest assigned, Container container,
- String matchedLocation, boolean honorLocalityFlags) {
- if (assigned == null) {
- return;
- }
- Object task = getTask(assigned);
- assert task != null;
-
- LOG.info("Assigning container to task"
- + ", container=" + container
- + ", task=" + task
- + ", containerHost=" + container.getNodeId().getHost()
- + ", localityMatchType=" + locality
- + ", matchedLocation=" + matchedLocation
- + ", honorLocalityFlags=" + honorLocalityFlags
- + ", reusedContainer="
- + containerAssignments.containsKey(container.getId())
- + ", delayedContainers=" + delayedContainerManager.delayedContainers.size()
- + ", containerResourceMemory=" + container.getResource().getMemory()
- + ", containerResourceVCores="
- + container.getResource().getVirtualCores());
-
- assignContainer(task, container, assigned);
- }
- }
-
- private class NodeLocalContainerAssigner extends ContainerAssigner {
-
- NodeLocalContainerAssigner() {
- super("NodeLocal");
- }
-
- @Override
- public CookieContainerRequest assignNewContainer(Container container) {
- String location = container.getNodeId().getHost();
- CookieContainerRequest assigned = getMatchingRequestWithPriority(
- container, location);
- doBookKeepingForAssignedContainer(assigned, container, location, false);
- return assigned;
- }
-
- @Override
- public CookieContainerRequest assignReUsedContainer(Container container,
- boolean honorLocality) {
- String location = container.getNodeId().getHost();
- CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
- container, location, true);
- doBookKeepingForAssignedContainer(assigned, container, location, true);
- return assigned;
-
- }
- }
-
- private class RackLocalContainerAssigner extends ContainerAssigner {
-
- RackLocalContainerAssigner() {
- super("RackLocal");
- }
-
- @Override
- public CookieContainerRequest assignNewContainer(Container container) {
- String location = RackResolver.resolve(container.getNodeId().getHost())
- .getNetworkLocation();
- CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
- location);
- doBookKeepingForAssignedContainer(assigned, container, location, false);
- return assigned;
- }
-
- @Override
- public CookieContainerRequest assignReUsedContainer(
- Container container, boolean honorLocality) {
- // TEZ-586 this is not match an actual rackLocal request unless honorLocality
- // is false. This method is useless if honorLocality=true
- if (!honorLocality) {
- String location = RackResolver.resolve(container.getNodeId().getHost())
- .getNetworkLocation();
- CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
- container, location, false);
- doBookKeepingForAssignedContainer(assigned, container, location,
- honorLocality);
- return assigned;
- }
- return null;
- }
- }
-
- private class NonLocalContainerAssigner extends ContainerAssigner {
-
- NonLocalContainerAssigner() {
- super("NonLocal");
- }
-
- @Override
- public CookieContainerRequest assignNewContainer(Container container) {
- String location = ResourceRequest.ANY;
- CookieContainerRequest assigned = getMatchingRequestWithPriority(container,
- location);
- doBookKeepingForAssignedContainer(assigned, container, location, false);
- return assigned;
- }
-
- @Override
- public CookieContainerRequest assignReUsedContainer(Container container,
- boolean honorLocality) {
- if (!honorLocality) {
- String location = ResourceRequest.ANY;
- CookieContainerRequest assigned = getMatchingRequestWithoutPriority(
- container, location, false);
- doBookKeepingForAssignedContainer(assigned, container, location,
- honorLocality);
- return assigned;
- }
- return null;
- }
-
- }
-
-
- @VisibleForTesting
- class DelayedContainerManager extends Thread {
-
- class HeldContainerTimerComparator implements Comparator<HeldContainer> {
-
- @Override
- public int compare(HeldContainer c1,
- HeldContainer c2) {
- return (int) (c1.getNextScheduleTime() - c2.getNextScheduleTime());
- }
- }
-
- PriorityBlockingQueue<HeldContainer> delayedContainers =
- new PriorityBlockingQueue<HeldContainer>(20,
- new HeldContainerTimerComparator());
-
- private volatile boolean tryAssigningAll = false;
- private volatile boolean running = true;
- private long maxScheduleTimeSeen = -1;
-
- // used for testing only
- @VisibleForTesting
- volatile AtomicBoolean drainedDelayedContainersForTest = null;
-
- DelayedContainerManager() {
- super.setName("DelayedContainerManager");
- }
-
- @Override
- public void run() {
- while(running) {
- // Try assigning all containers if there's a request to do so.
- if (tryAssigningAll) {
- doAssignAll();
- tryAssigningAll = false;
- }
-
- // Try allocating containers which have timed out.
- // Required since these containers may get assigned without
- // locality at this point.
- if (delayedContainers.peek() == null) {
- try {
- // test only signaling to make TestTaskScheduler work
- if (drainedDelayedContainersForTest != null) {
- drainedDelayedContainersForTest.set(true);
- synchronized (drainedDelayedContainersForTest) {
- drainedDelayedContainersForTest.notifyAll();
- }
- }
- synchronized(this) {
- this.wait();
- }
- // Re-loop to see if tryAssignAll is set.
- continue;
- } catch (InterruptedException e) {
- LOG.info("AllocatedContainerManager Thread interrupted");
- }
- } else {
- // test only sleep to prevent tight loop cycling that makes tests stall
- if (drainedDelayedContainersForTest != null) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- HeldContainer delayedContainer = delayedContainers.peek();
- if (delayedContainer == null) {
- continue;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Considering HeldContainer: "
- + delayedContainer + " for assignment");
- }
- long currentTs = System.currentTimeMillis();
- long nextScheduleTs = delayedContainer.getNextScheduleTime();
- if (currentTs >= nextScheduleTs) {
- // Remove the container and try scheduling it.
- // TEZ-587 what if container is released by RM after this
- // in onContainerCompleted()
- delayedContainer = delayedContainers.poll();
- if (delayedContainer == null) {
- continue;
- }
- Map<CookieContainerRequest, Container> assignedContainers = null;
- synchronized(TaskScheduler.this) {
- if (null !=
- heldContainers.get(delayedContainer.getContainer().getId())) {
- assignedContainers = assignDelayedContainer(delayedContainer);
- } else {
- LOG.info("Skipping delayed container as container is no longer"
- + " running, containerId="
- + delayedContainer.getContainer().getId());
- }
- }
- // Inform App should be done outside of the lock
- informAppAboutAssignments(assignedContainers);
- } else {
- synchronized(this) {
- try {
- // Wait for the next container to be assignable
- delayedContainer = delayedContainers.peek();
- long diff = localitySchedulingDelay;
- if (delayedContainer != null) {
- diff = delayedContainer.getNextScheduleTime() - currentTs;
- }
- if (diff > 0) {
- this.wait(diff);
- }
- } catch (InterruptedException e) {
- LOG.info("AllocatedContainerManager Thread interrupted");
- }
- }
- }
- }
- }
- releasePendingContainers();
- }
-
- private void doAssignAll() {
- // The allocatedContainers queue should not be modified in the middle of an iteration over it.
- // Synchronizing here on TaskScheduler.this to prevent this from happening.
- // The call to assignAll from within this method should NOT add any
- // elements back to the allocatedContainers list. Since they're all
- // delayed elements, de-allocation should not happen either - leaving the
- // list of delayed containers intact, except for the contaienrs which end
- // up getting assigned.
- if (delayedContainers.isEmpty()) {
- return;
- }
-
- Map<CookieContainerRequest, Container> assignedContainers;
- synchronized(TaskScheduler.this) {
- // honor reuse-locality flags (container not timed out yet), Don't queue
- // (already in queue), don't release (release happens when containers
- // time-out)
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to assign all delayed containers to newly received"
- + " tasks");
- }
- Iterator<HeldContainer> iter = delayedContainers.iterator();
- while(iter.hasNext()) {
- HeldContainer delayedContainer = iter.next();
- if (!heldContainers.containsKey(delayedContainer.getContainer().getId())) {
- // this container is no longer held by us
- LOG.info("AssignAll - Skipping delayed container as container is no longer"
- + " running, containerId="
- + delayedContainer.getContainer().getId());
- iter.remove();
- }
- }
- assignedContainers = tryAssignReUsedContainers(
- new ContainerIterable(delayedContainers));
- }
- // Inform app
- informAppAboutAssignments(assignedContainers);
- }
-
- /**
- * Indicate that an attempt should be made to allocate all available containers.
- * Intended to be used in cases where new Container requests come in
- */
- public void triggerScheduling(boolean scheduleAll) {
- this.tryAssigningAll = scheduleAll;
- synchronized(this) {
- this.notify();
- }
- }
-
- public void shutdown() {
- this.running = false;
- this.interrupt();
- }
-
- private void releasePendingContainers() {
- List<HeldContainer> pendingContainers = Lists.newArrayListWithCapacity(
- delayedContainers.size());
- delayedContainers.drainTo(pendingContainers);
- releaseUnassignedContainers(new ContainerIterable(pendingContainers));
- }
-
- private void addDelayedContainer(Container container,
- long nextScheduleTime) {
- HeldContainer delayedContainer = heldContainers.get(container.getId());
- if (delayedContainer == null) {
- LOG.warn("Attempting to add a non-running container to the"
- + " delayed container list, containerId=" + container.getId());
- return;
- } else {
- delayedContainer.setNextScheduleTime(nextScheduleTime);
- }
- if (maxScheduleTimeSeen < nextScheduleTime) {
- maxScheduleTimeSeen = nextScheduleTime;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding container to delayed queue"
- + ", containerId=" + delayedContainer.getContainer().getId()
- + ", nextScheduleTime=" + delayedContainer.getNextScheduleTime()
- + ", containerExpiry=" + delayedContainer.getContainerExpiryTime());
- }
- boolean added = delayedContainers.offer(delayedContainer);
- synchronized(this) {
- this.notify();
- }
- if (!added) {
- releaseUnassignedContainers(Lists.newArrayList(container));
- }
- }
-
- }
-
- private class ContainerIterable implements Iterable<Container> {
-
- private final Iterable<HeldContainer> delayedContainers;
-
- ContainerIterable(Iterable<HeldContainer> delayedContainers) {
- this.delayedContainers = delayedContainers;
- }
-
- @Override
- public Iterator<Container> iterator() {
-
- final Iterator<HeldContainer> delayedContainerIterator = delayedContainers
- .iterator();
-
- return new Iterator<Container>() {
-
- @Override
- public boolean hasNext() {
- return delayedContainerIterator.hasNext();
- }
-
- @Override
- public Container next() {
- return delayedContainerIterator.next().getContainer();
- }
-
- @Override
- public void remove() {
- delayedContainerIterator.remove();
- }
- };
- }
- }
-
- static class HeldContainer {
-
- enum LocalityMatchLevel {
- NEW,
- NODE,
- RACK,
- NON_LOCAL
- }
-
- Container container;
- private long nextScheduleTime;
- private Object firstContainerSignature;
- private LocalityMatchLevel localityMatchLevel;
- private long containerExpiryTime;
- private CookieContainerRequest lastTaskInfo;
- private int numAssignmentAttempts = 0;
-
- HeldContainer(Container container,
- long nextScheduleTime,
- long containerExpiryTime,
- CookieContainerRequest firstTaskInfo) {
- this.container = container;
- this.nextScheduleTime = nextScheduleTime;
- if (firstTaskInfo != null) {
- this.lastTaskInfo = firstTaskInfo;
- this.firstContainerSignature = firstTaskInfo.getCookie().getContainerSignature();
- }
- this.localityMatchLevel = LocalityMatchLevel.NODE;
- this.containerExpiryTime = containerExpiryTime;
- }
-
- boolean isNew() {
- return firstContainerSignature == null;
- }
-
- int geNumAssignmentAttempts() {
- return numAssignmentAttempts;
- }
-
- void incrementAssignmentAttempts() {
- numAssignmentAttempts++;
- }
-
- public Container getContainer() {
- return this.container;
- }
-
- public long getNextScheduleTime() {
- return this.nextScheduleTime;
- }
-
- public void setNextScheduleTime(long nextScheduleTime) {
- this.nextScheduleTime = nextScheduleTime;
- }
-
- public long getContainerExpiryTime() {
- return this.containerExpiryTime;
- }
-
- public void setContainerExpiryTime(long containerExpiryTime) {
- this.containerExpiryTime = containerExpiryTime;
- }
-
- public Object getFirstContainerSignature() {
- return this.firstContainerSignature;
- }
-
- public CookieContainerRequest getLastTaskInfo() {
- return this.lastTaskInfo;
- }
-
- public void setLastTaskInfo(CookieContainerRequest taskInfo) {
- lastTaskInfo = taskInfo;
- }
-
- public synchronized void resetLocalityMatchLevel() {
- localityMatchLevel = LocalityMatchLevel.NEW;
- }
-
- public synchronized void incrementLocalityMatchLevel() {
- if (localityMatchLevel.equals(LocalityMatchLevel.NEW)) {
- localityMatchLevel = LocalityMatchLevel.NODE;
- } else if (localityMatchLevel.equals(LocalityMatchLevel.NODE)) {
- localityMatchLevel = LocalityMatchLevel.RACK;
- } else if (localityMatchLevel.equals(LocalityMatchLevel.RACK)) {
- localityMatchLevel = LocalityMatchLevel.NON_LOCAL;
- } else if (localityMatchLevel.equals(LocalityMatchLevel.NON_LOCAL)) {
- throw new TezUncheckedException("Cannot increment locality level "
- + " from current NON_LOCAL for container: " + container.getId());
- }
- }
-
- public LocalityMatchLevel getLocalityMatchLevel() {
- return this.localityMatchLevel;
- }
-
- @Override
- public String toString() {
- return "HeldContainer: id: " + container.getId()
- + ", nextScheduleTime: " + nextScheduleTime
- + ", localityMatchLevel=" + localityMatchLevel
- + ", signature: "
- + (firstContainerSignature != null? firstContainerSignature.toString():"null");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index 53c3e95..5de8032 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
/**
* Makes use of an ExecutionService to invoke application callbacks. Methods
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 865b192..ec35f28 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -51,7 +51,7 @@ import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
-import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
@@ -75,7 +75,7 @@ public class TaskSchedulerEventHandler extends AbstractService
protected final AppContext appContext;
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
- protected TaskSchedulerInterface taskScheduler;
+ protected TaskSchedulerService taskScheduler;
private DAGAppMaster dagAppMaster;
private Map<ApplicationAccessType, String> appAcls = null;
private Thread eventHandlingThread;
@@ -288,16 +288,16 @@ public class TaskSchedulerEventHandler extends AbstractService
}
- protected TaskSchedulerInterface createTaskScheduler(String host, int port,
+ protected TaskSchedulerService createTaskScheduler(String host, int port,
String trackingUrl, AppContext appContext) {
boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
if (isLocal) {
- return new LocalTaskScheduler(this, this.containerSignatureMatcher,
+ return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
host, port, trackingUrl, appContext);
}
else {
- return new TaskScheduler(this, this.containerSignatureMatcher,
+ return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
host, port, trackingUrl, appContext);
}
}
@@ -308,8 +308,8 @@ public class TaskSchedulerEventHandler extends AbstractService
dagAppMaster = appContext.getAppMaster();
taskScheduler = createTaskScheduler(serviceAddr.getHostName(),
serviceAddr.getPort(), "", appContext);
- ((AbstractService)taskScheduler).init(getConfig());
- ((AbstractService)taskScheduler).start();
+ taskScheduler.init(getConfig());
+ taskScheduler.start();
if (shouldUnregisterFlag.get()) {
// Flag may have been set earlier when task scheduler was not initialized
taskScheduler.setShouldUnregister();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/4dfd8341/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
deleted file mode 100644
index 1c35492..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerInterface.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.rm;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public interface TaskSchedulerInterface {
-
- public abstract Resource getAvailableResources();
-
- public abstract int getClusterNodeCount();
-
- public abstract void resetMatchLocalityForAllHeldContainers();
-
- public abstract Resource getTotalResources();
-
- public abstract void blacklistNode(NodeId nodeId);
-
- public abstract void unblacklistNode(NodeId nodeId);
-
- public abstract void allocateTask(Object task, Resource capability,
- String[] hosts, String[] racks, Priority priority,
- Object containerSignature, Object clientCookie);
-
- /**
- * Allocate affinitized to a specific container
- */
- public abstract void allocateTask(Object task, Resource capability,
- ContainerId containerId, Priority priority, Object containerSignature,
- Object clientCookie);
-
- public abstract boolean deallocateTask(Object task, boolean taskSucceeded);
-
- public abstract Object deallocateContainer(ContainerId containerId);
-
- public abstract void setShouldUnregister();
-}