You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/12/10 04:30:24 UTC
[14/27] tez git commit: TEZ-1790. DeallocationTaskRequest may been
handled before corresponding AllocationTaskRequest in local mode (zjffdu)
TEZ-1790. DeallocationTaskRequest may been handled before corresponding AllocationTaskRequest in local mode (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b7c7312
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b7c7312
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b7c7312
Branch: refs/heads/TEZ-8
Commit: 3b7c7312cd4425ace1801ffe6f0f6fd80ef3cbcc
Parents: 99aa693
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Dec 2 12:04:46 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Dec 2 12:04:46 2014 +0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 4 +-
.../dag/app/rm/LocalTaskSchedulerService.java | 28 +++-
.../app/rm/TestLocalTaskSchedulerService.java | 149 ++++++++++++++++++-
4 files changed, 169 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3b7c7312/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2afc0d5..1ea8a92 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,7 @@ ALL CHANGES:
TEZ-1650. Please create a DOAP file for your TLP.
TEZ-1697. DAG submission fails if a local resource added is already part of tez.lib.uris
TEZ-1800. Integer overflow in ExternalSorter.getInitialMemoryRequirement()
+ TEZ-1790. DeallocationTaskRequest may been handled before corresponding AllocationTaskRequest in local mode
Release 0.5.3: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/3b7c7312/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 5103095..007774f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1005,10 +1005,10 @@ public class TaskAttemptImpl implements TaskAttempt,
remoteTaskSpec = ta.createRemoteTaskSpec();
LOG.info("remoteTaskSpec:" + remoteTaskSpec);
} catch (AMUserCodeException e) {
- String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta.getTaskID();
+ String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
LOG.error(msg, e);
String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
- new TerminatedBeforeRunningTransition(FAILED_HELPER).transition(ta,
+ new TerminateTransition(FAILED_HELPER).transition(ta,
new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag,
TaskAttemptTerminationCause.APPLICATION_ERROR));
return TaskAttemptStateInternal.FAILED;
http://git-wip-us.apache.org/repos/asf/tez/blob/3b7c7312/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 026ed7d..2ebcebb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -25,12 +25,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import com.google.common.primitives.Ints;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -38,8 +39,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
@@ -156,12 +157,16 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
@Override
public void serviceInit(Configuration conf) {
- taskRequestHandler = new AsyncDelegateRequestHandler(taskRequestQueue,
+ taskRequestHandler = createRequestHandler(conf);
+ asyncDelegateRequestThread = new Thread(taskRequestHandler);
+ }
+
+ protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
+ return new AsyncDelegateRequestHandler(taskRequestQueue,
new LocalContainerFactory(appContext),
taskAllocations,
appClientDelegate,
conf);
- asyncDelegateRequestThread = new Thread(taskRequestHandler);
}
@Override
@@ -341,7 +346,20 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
appClientDelegate.containerBeingReleased(container.getId());
}
else {
- LOG.warn("Unable to find and remove task " + request.task + " from task allocations");
+ boolean deallocationBeforeAllocation = false;
+ Iterator<TaskRequest> iter = taskRequestQueue.iterator();
+ while (iter.hasNext()) {
+ TaskRequest taskRequest = iter.next();
+ if (taskRequest instanceof AllocateTaskRequest && taskRequest.task.equals(request.task)) {
+ iter.remove();
+ deallocationBeforeAllocation = true;
+ LOG.info("deallcation happen before allocation for task:" + request.task);
+ break;
+ }
+ }
+ if (!deallocationBeforeAllocation) {
+ throw new TezUncheckedException("Unable to find and remove task " + request.task + " from task allocations");
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/3b7c7312/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 5fc5a7d..3cf4f6c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -18,20 +18,30 @@
package org.apache.tez.dag.app.rm;
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TestLocalTaskSchedulerService.MockLocalTaskSchedulerSerivce.MockAsyncDelegateRequestHandler;
+import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import org.junit.Assert;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class TestLocalTaskSchedulerService {
LocalTaskSchedulerService ltss ;
int core =10;
- @Test
+ @Test(timeout = 5000)
public void testCreateResource() {
Resource resource;
//value in integer
@@ -40,7 +50,7 @@ public class TestLocalTaskSchedulerService {
Assert.assertEquals((int)(value/(1024*1024)),resource.getMemory());
}
- @Test
+ @Test(timeout = 5000)
public void testCreateResourceLargerThanIntMax() {
//value beyond integer but within Long.MAX_VALUE
try {
@@ -52,7 +62,7 @@ public class TestLocalTaskSchedulerService {
}
}
- @Test
+ @Test(timeout = 5000)
public void testCreateResourceWithNegativeValue() {
//value is Long.MAX_VALUE*1024*1024,
// it will be negative after it is passed to createResource
@@ -65,4 +75,131 @@ public class TestLocalTaskSchedulerService {
assertTrue(ex.getMessage().contains("Negative Memory or Core provided!"));
}
}
+
+ /**
+ * Normal flow of TaskAttempt
+ */
+ @Test(timeout = 5000)
+ public void testDeallocationBeforeAllocation() {
+ MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
+ (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class));
+ taskSchedulerService.init(new Configuration());
+ taskSchedulerService.start();
+
+ Task task = mock(Task.class);
+ taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
+ taskSchedulerService.deallocateTask(task, false);
+ // start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first
+ taskSchedulerService.startRequestHandlerThread();
+
+ MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
+ requestHandler.drainRequest(1);
+ assertEquals(1, requestHandler.deallocateCount);
+ // The corresponding AllocateTaskRequest will be removed, so won't been processed.
+ assertEquals(0, requestHandler.allocateCount);
+ taskSchedulerService.stop();
+ }
+
+ /**
+ * TaskAttempt Killed from START_WAIT
+ */
+ @Test(timeout = 5000)
+ public void testDeallocationAfterAllocation() {
+ MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
+ (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class));
+ taskSchedulerService.init(new Configuration());
+ taskSchedulerService.start();
+
+ Task task = mock(Task.class);
+ taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
+ taskSchedulerService.startRequestHandlerThread();
+
+ MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
+ requestHandler.drainRequest(1);
+ taskSchedulerService.deallocateTask(task, false);
+ requestHandler.drainRequest(2);
+ assertEquals(1, requestHandler.deallocateCount);
+ assertEquals(1, requestHandler.allocateCount);
+ taskSchedulerService.stop();
+ }
+
+ static class MockLocalTaskSchedulerSerivce extends LocalTaskSchedulerService {
+
+ private MockAsyncDelegateRequestHandler requestHandler;
+
+ public MockLocalTaskSchedulerSerivce(TaskSchedulerAppCallback appClient,
+ ContainerSignatureMatcher containerSignatureMatcher,
+ String appHostName, int appHostPort, String appTrackingUrl,
+ AppContext appContext) {
+ super(appClient, containerSignatureMatcher, appHostName, appHostPort,
+ appTrackingUrl, appContext);
+ }
+
+ @Override
+ public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
+ requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue,
+ new LocalContainerFactory(appContext),
+ taskAllocations,
+ appClientDelegate,
+ conf);
+ return requestHandler;
+ }
+
+ @Override
+ public void serviceStart() {
+ // don't start RequestHandler thread, control it in unit test
+ }
+
+ public void startRequestHandlerThread() {
+ asyncDelegateRequestThread.start();
+ }
+
+ public MockAsyncDelegateRequestHandler getRequestHandler() {
+ return requestHandler;
+ }
+
+ static class MockAsyncDelegateRequestHandler extends AsyncDelegateRequestHandler {
+
+ public int allocateCount = 0;
+ public int deallocateCount = 0;
+ public int processedCount =0;
+
+ MockAsyncDelegateRequestHandler(
+ BlockingQueue<TaskRequest> taskRequestQueue,
+ LocalContainerFactory localContainerFactory,
+ HashMap<Object, Container> taskAllocations,
+ TaskSchedulerAppCallback appClientDelegate, Configuration conf) {
+ super(taskRequestQueue, localContainerFactory, taskAllocations,
+ appClientDelegate, conf);
+ }
+
+ @Override
+ void processRequest() {
+ super.processRequest();
+ processedCount ++;
+ }
+
+ public void drainRequest(int count) {
+ while(processedCount != count || !taskRequestQueue.isEmpty()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ void allocateTask(AllocateTaskRequest request) {
+ super.allocateTask(request);
+ allocateCount ++;
+ }
+
+ @Override
+ void deallocateTask(DeallocateTaskRequest request) {
+ super.deallocateTask(request);
+ deallocateCount ++;
+ }
+ }
+ }
}