You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2018/02/14 16:16:25 UTC
tez git commit: TEZ-3893. Tez Local Mode can hang for cases.
(Jonathan Eagles via jlowe)
Repository: tez
Updated Branches:
refs/heads/master a1f2da8eb -> 022df7218
TEZ-3893. Tez Local Mode can hang for cases. (Jonathan Eagles via jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/022df721
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/022df721
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/022df721
Branch: refs/heads/master
Commit: 022df7218afbb2c940ddc4447246dea5a546c759
Parents: a1f2da8
Author: Jason Lowe <jl...@apache.org>
Authored: Wed Feb 14 10:14:34 2018 -0600
Committer: Jason Lowe <jl...@apache.org>
Committed: Wed Feb 14 10:14:34 2018 -0600
----------------------------------------------------------------------
.../dag/app/rm/LocalTaskSchedulerService.java | 87 ++++++++++----------
.../tez/dag/app/rm/TestLocalTaskScheduler.java | 13 +--
.../app/rm/TestLocalTaskSchedulerService.java | 35 ++++----
3 files changed, 69 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 3b034cd..04e79a8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -19,8 +19,8 @@
package org.apache.tez.dag.app.rm;
import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.HashMap;
import java.util.Iterator;
@@ -51,7 +51,7 @@ public class LocalTaskSchedulerService extends TaskScheduler {
private static final Logger LOG = LoggerFactory.getLogger(LocalTaskSchedulerService.class);
final ContainerSignatureMatcher containerSignatureMatcher;
- final PriorityBlockingQueue<TaskRequest> taskRequestQueue;
+ final LinkedBlockingQueue<TaskRequest> taskRequestQueue;
final Configuration conf;
AsyncDelegateRequestHandler taskRequestHandler;
Thread asyncDelegateRequestThread;
@@ -62,7 +62,7 @@ public class LocalTaskSchedulerService extends TaskScheduler {
public LocalTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
super(taskSchedulerContext);
- taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
+ taskRequestQueue = new LinkedBlockingQueue<>();
taskAllocations = new LinkedHashMap<Object, Container>();
this.appTrackingUrl = taskSchedulerContext.getAppTrackingUrl();
this.containerSignatureMatcher = taskSchedulerContext.getContainerSignatureMatcher();
@@ -313,29 +313,31 @@ public class LocalTaskSchedulerService extends TaskScheduler {
}
static class AsyncDelegateRequestHandler implements Runnable {
- final BlockingQueue<TaskRequest> taskRequestQueue;
+ final LinkedBlockingQueue<TaskRequest> clientRequestQueue;
+ final PriorityBlockingQueue<AllocateTaskRequest> taskRequestQueue;
final LocalContainerFactory localContainerFactory;
final HashMap<Object, Container> taskAllocations;
final TaskSchedulerContext taskSchedulerContext;
final int MAX_TASKS;
- AsyncDelegateRequestHandler(BlockingQueue<TaskRequest> taskRequestQueue,
+ AsyncDelegateRequestHandler(LinkedBlockingQueue<TaskRequest> clientRequestQueue,
LocalContainerFactory localContainerFactory,
HashMap<Object, Container> taskAllocations,
TaskSchedulerContext taskSchedulerContext,
Configuration conf) {
- this.taskRequestQueue = taskRequestQueue;
+ this.clientRequestQueue = clientRequestQueue;
this.localContainerFactory = localContainerFactory;
this.taskAllocations = taskAllocations;
this.taskSchedulerContext = taskSchedulerContext;
this.MAX_TASKS = conf.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
+ this.taskRequestQueue = new PriorityBlockingQueue<>();
}
public void addAllocateTaskRequest(Object task, Resource capability, Priority priority,
Object clientCookie) {
try {
- taskRequestQueue.put(new AllocateTaskRequest(task, capability, priority, clientCookie));
+ clientRequestQueue.put(new AllocateTaskRequest(task, capability, priority, clientCookie));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -343,57 +345,54 @@ public class LocalTaskSchedulerService extends TaskScheduler {
public boolean addDeallocateTaskRequest(Object task) {
try {
- taskRequestQueue.put(new DeallocateTaskRequest(task));
+ clientRequestQueue.put(new DeallocateTaskRequest(task));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- synchronized(taskRequestQueue) {
- taskRequestQueue.notify();
- }
return true;
}
- boolean shouldWait() {
- return taskAllocations.size() >= MAX_TASKS;
+ boolean shouldProcess() {
+ return !taskRequestQueue.isEmpty() && taskAllocations.size() < MAX_TASKS;
}
@Override
public void run() {
- while(!Thread.currentThread().isInterrupted()) {
- synchronized(taskRequestQueue) {
- try {
- if (shouldWait()) {
- taskRequestQueue.wait();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ while (!Thread.currentThread().isInterrupted()) {
+ dispatchRequest();
+ while (shouldProcess()) {
+ allocateTask();
}
- processRequest();
}
}
- void processRequest() {
- try {
- TaskRequest request = taskRequestQueue.take();
- if (request instanceof AllocateTaskRequest) {
- allocateTask((AllocateTaskRequest)request);
- }
- else if (request instanceof DeallocateTaskRequest) {
- deallocateTask((DeallocateTaskRequest)request);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (NullPointerException e) {
- LOG.warn("Task request was badly constructed");
+ void dispatchRequest() {
+ try {
+ TaskRequest request = clientRequestQueue.take();
+ if (request instanceof AllocateTaskRequest) {
+ taskRequestQueue.put((AllocateTaskRequest)request);
+ }
+ else if (request instanceof DeallocateTaskRequest) {
+ deallocateTask((DeallocateTaskRequest)request);
+ }
+ else {
+ LOG.error("Unknown task request message: " + request);
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- void allocateTask(AllocateTaskRequest request) {
- Container container = localContainerFactory.createContainer(request.capability,
- request.priority);
- taskAllocations.put(request.task, container);
- taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container);
+ void allocateTask() {
+ try {
+ AllocateTaskRequest request = taskRequestQueue.take();
+ Container container = localContainerFactory.createContainer(request.capability,
+ request.priority);
+ taskAllocations.put(request.task, container);
+ taskSchedulerContext.taskAllocated(request.task, request.clientCookie, container);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
void deallocateTask(DeallocateTaskRequest request) {
@@ -403,13 +402,13 @@ public class LocalTaskSchedulerService extends TaskScheduler {
}
else {
boolean deallocationBeforeAllocation = false;
- Iterator<TaskRequest> iter = taskRequestQueue.iterator();
+ Iterator<AllocateTaskRequest> iter = taskRequestQueue.iterator();
while (iter.hasNext()) {
TaskRequest taskRequest = iter.next();
- if (taskRequest instanceof AllocateTaskRequest && taskRequest.task.equals(request.task)) {
+ if (taskRequest.task.equals(request.task)) {
iter.remove();
deallocationBeforeAllocation = true;
- LOG.info("deallcation happen before allocation for task:" + request.task);
+ LOG.info("Deallocation request before allocation for task:" + request.task);
break;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
index 2ada2f1..36505c2 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskScheduler.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.app.rm;
import java.util.HashMap;
import java.util.LinkedHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
@@ -57,11 +57,11 @@ public class TestLocalTaskScheduler {
LocalContainerFactory containerFactory = new LocalContainerFactory(appAttemptId, 1000);
HashMap<Object, Container> taskAllocations = new LinkedHashMap<Object, Container>();
- PriorityBlockingQueue<TaskRequest> taskRequestQueue = new PriorityBlockingQueue<TaskRequest>();
+ LinkedBlockingQueue<TaskRequest> clientRequestQueue = new LinkedBlockingQueue<>();
// Object under test
AsyncDelegateRequestHandler requestHandler =
- new AsyncDelegateRequestHandler(taskRequestQueue,
+ new AsyncDelegateRequestHandler(clientRequestQueue,
containerFactory,
taskAllocations,
mockContext,
@@ -71,17 +71,18 @@ public class TestLocalTaskScheduler {
for (int i = 0; i < MAX_TASKS; i++) {
Priority priority = Priority.newInstance(20);
requestHandler.addAllocateTaskRequest(new Long(i), null, priority, null);
- requestHandler.processRequest();
+ requestHandler.dispatchRequest();
+ requestHandler.allocateTask();
}
// Only MAX_TASKS number of tasks should have been allocated
Assert.assertEquals("Wrong number of allocate tasks", MAX_TASKS, taskAllocations.size());
- Assert.assertTrue("Another allocation should not fit", requestHandler.shouldWait());
+ Assert.assertTrue("Another allocation should not fit", !requestHandler.shouldProcess());
// Deallocate down to zero
for (int i = 0; i < MAX_TASKS; i++) {
requestHandler.addDeallocateTaskRequest(new Long(i));
- requestHandler.processRequest();
+ requestHandler.dispatchRequest();
}
// All allocated tasks should have been removed
http://git-wip-us.apache.org/repos/asf/tez/blob/022df721/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 3b2de34..c2daf84 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -19,7 +19,7 @@
package org.apache.tez.dag.app.rm;
import java.util.HashMap;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -91,6 +91,9 @@ public class TestLocalTaskSchedulerService {
taskSchedulerService.initialize();
taskSchedulerService.start();
+ // create a task that fills the task allocation queue
+ Task dummy_task = mock(Task.class);
+ taskSchedulerService.allocateTask(dummy_task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
Task task = mock(Task.class);
taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
taskSchedulerService.deallocateTask(task, false, null, null);
@@ -98,10 +101,10 @@ public class TestLocalTaskSchedulerService {
taskSchedulerService.startRequestHandlerThread();
MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
- requestHandler.drainRequest(1);
+ requestHandler.drainRequest(3);
assertEquals(1, requestHandler.deallocateCount);
// The corresponding AllocateTaskRequest will be removed, so won't been processed.
- assertEquals(0, requestHandler.allocateCount);
+ assertEquals(1, requestHandler.allocateCount);
taskSchedulerService.shutdown();
}
@@ -170,10 +173,10 @@ public class TestLocalTaskSchedulerService {
public int allocateCount = 0;
public int deallocateCount = 0;
- public int processedCount =0;
+ public int dispatchCount = 0;
MockAsyncDelegateRequestHandler(
- BlockingQueue<TaskRequest> taskRequestQueue,
+ LinkedBlockingQueue<TaskRequest> taskRequestQueue,
LocalContainerFactory localContainerFactory,
HashMap<Object, Container> taskAllocations,
TaskSchedulerContext appClientDelegate, Configuration conf) {
@@ -182,13 +185,19 @@ public class TestLocalTaskSchedulerService {
}
@Override
- void processRequest() {
- super.processRequest();
- processedCount ++;
+ void dispatchRequest() {
+ super.dispatchRequest();
+ dispatchCount++;
+ }
+
+ @Override
+ void allocateTask() {
+ super.allocateTask();
+ allocateCount++;
}
public void drainRequest(int count) {
- while(processedCount != count || !taskRequestQueue.isEmpty()) {
+ while(dispatchCount != count || !clientRequestQueue.isEmpty()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
@@ -198,15 +207,9 @@ public class TestLocalTaskSchedulerService {
}
@Override
- void allocateTask(AllocateTaskRequest request) {
- super.allocateTask(request);
- allocateCount ++;
- }
-
- @Override
void deallocateTask(DeallocateTaskRequest request) {
super.deallocateTask(request);
- deallocateCount ++;
+ deallocateCount++;
}
}
}