You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/12/10 04:30:24 UTC

[14/27] tez git commit: TEZ-1790. DeallocationTaskRequest may been handled before corresponding AllocationTaskRequest in local mode (zjffdu)

TEZ-1790. DeallocationTaskRequest may been handled before corresponding AllocationTaskRequest in local mode (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b7c7312
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b7c7312
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b7c7312

Branch: refs/heads/TEZ-8
Commit: 3b7c7312cd4425ace1801ffe6f0f6fd80ef3cbcc
Parents: 99aa693
Author: Jeff Zhang <zj...@apache.org>
Authored: Tue Dec 2 12:04:46 2014 +0800
Committer: Jeff Zhang <zj...@apache.org>
Committed: Tue Dec 2 12:04:46 2014 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |   4 +-
 .../dag/app/rm/LocalTaskSchedulerService.java   |  28 +++-
 .../app/rm/TestLocalTaskSchedulerService.java   | 149 ++++++++++++++++++-
 4 files changed, 169 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3b7c7312/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2afc0d5..1ea8a92 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,7 @@ ALL CHANGES:
   TEZ-1650. Please create a DOAP file for your TLP.
   TEZ-1697. DAG submission fails if a local resource added is already part of tez.lib.uris
   TEZ-1800. Integer overflow in ExternalSorter.getInitialMemoryRequirement()
+  TEZ-1790. DeallocationTaskRequest may been handled before corresponding AllocationTaskRequest in local mode
 
 Release 0.5.3: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3b7c7312/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 5103095..007774f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1005,10 +1005,10 @@ public class TaskAttemptImpl implements TaskAttempt,
         remoteTaskSpec = ta.createRemoteTaskSpec();
         LOG.info("remoteTaskSpec:" + remoteTaskSpec);
       } catch (AMUserCodeException e) {
-        String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta.getTaskID();
+        String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
         LOG.error(msg, e);
         String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
-        new TerminatedBeforeRunningTransition(FAILED_HELPER).transition(ta,
+        new TerminateTransition(FAILED_HELPER).transition(ta,
             new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag,
                 TaskAttemptTerminationCause.APPLICATION_ERROR));
         return TaskAttemptStateInternal.FAILED;

http://git-wip-us.apache.org/repos/asf/tez/blob/3b7c7312/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 026ed7d..2ebcebb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -25,12 +25,13 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 
 import com.google.common.primitives.Ints;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -38,8 +39,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 
@@ -156,12 +157,16 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
 
   @Override
   public void serviceInit(Configuration conf) {
-      taskRequestHandler = new AsyncDelegateRequestHandler(taskRequestQueue,
+    taskRequestHandler = createRequestHandler(conf);
+    asyncDelegateRequestThread = new Thread(taskRequestHandler);
+  }
+
+  protected AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
+    return new AsyncDelegateRequestHandler(taskRequestQueue,
         new LocalContainerFactory(appContext),
         taskAllocations,
         appClientDelegate,
         conf);
-    asyncDelegateRequestThread = new Thread(taskRequestHandler);
   }
 
   @Override
@@ -341,7 +346,20 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
         appClientDelegate.containerBeingReleased(container.getId());
       }
       else {
-        LOG.warn("Unable to find and remove task " + request.task + " from task allocations");
+        boolean deallocationBeforeAllocation = false;
+        Iterator<TaskRequest> iter = taskRequestQueue.iterator();
+        while (iter.hasNext()) {
+          TaskRequest taskRequest = iter.next();
+          if (taskRequest instanceof AllocateTaskRequest && taskRequest.task.equals(request.task)) {
+            iter.remove();
+            deallocationBeforeAllocation = true;
+            LOG.info("deallcation happen before allocation for task:" + request.task);
+            break;
+          }
+        }
+        if (!deallocationBeforeAllocation) {
+          throw new TezUncheckedException("Unable to find and remove task " + request.task + " from task allocations");
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/3b7c7312/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
index 5fc5a7d..3cf4f6c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestLocalTaskSchedulerService.java
@@ -18,20 +18,30 @@
 
 package org.apache.tez.dag.app.rm;
 
+import java.util.HashMap;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TestLocalTaskSchedulerService.MockLocalTaskSchedulerSerivce.MockAsyncDelegateRequestHandler;
+import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestLocalTaskSchedulerService {
 
   LocalTaskSchedulerService ltss ;
   int core =10;
 
-  @Test
+  @Test(timeout = 5000)
   public void testCreateResource() {
     Resource resource;
     //value in integer
@@ -40,7 +50,7 @@ public class TestLocalTaskSchedulerService {
     Assert.assertEquals((int)(value/(1024*1024)),resource.getMemory());
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testCreateResourceLargerThanIntMax() {
     //value beyond integer but within Long.MAX_VALUE
     try {
@@ -52,7 +62,7 @@ public class TestLocalTaskSchedulerService {
     }
   }
 
-  @Test
+  @Test(timeout = 5000)
   public void testCreateResourceWithNegativeValue() {
     //value is Long.MAX_VALUE*1024*1024,
     // it will be negative after it is passed to createResource
@@ -65,4 +75,131 @@ public class TestLocalTaskSchedulerService {
       assertTrue(ex.getMessage().contains("Negative Memory or Core provided!"));
     }
   }
+
+  /**
+   * Normal flow of TaskAttempt
+   */
+  @Test(timeout = 5000)
+  public void testDeallocationBeforeAllocation() {
+    MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
+        (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class));
+    taskSchedulerService.init(new Configuration());
+    taskSchedulerService.start();
+
+    Task task = mock(Task.class);
+    taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
+    taskSchedulerService.deallocateTask(task, false);
+    // start the RequestHandler, DeallocateTaskRequest has higher priority, so will be processed first
+    taskSchedulerService.startRequestHandlerThread();
+
+    MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
+    requestHandler.drainRequest(1);
+    assertEquals(1, requestHandler.deallocateCount);
+    // The corresponding AllocateTaskRequest will be removed, so won't been processed.
+    assertEquals(0, requestHandler.allocateCount);
+    taskSchedulerService.stop();
+  }
+
+  /**
+   * TaskAttempt Killed from START_WAIT
+   */
+  @Test(timeout = 5000)
+  public void testDeallocationAfterAllocation() {
+    MockLocalTaskSchedulerSerivce taskSchedulerService = new MockLocalTaskSchedulerSerivce
+        (mock(TaskSchedulerAppCallback.class), mock(ContainerSignatureMatcher.class), "", 0, "", mock(AppContext.class));
+    taskSchedulerService.init(new Configuration());
+    taskSchedulerService.start();
+
+    Task task = mock(Task.class);
+    taskSchedulerService.allocateTask(task, Resource.newInstance(1024, 1), null, null, Priority.newInstance(1), null, null);
+    taskSchedulerService.startRequestHandlerThread();
+
+    MockAsyncDelegateRequestHandler requestHandler = taskSchedulerService.getRequestHandler();
+    requestHandler.drainRequest(1);
+    taskSchedulerService.deallocateTask(task, false);
+    requestHandler.drainRequest(2);
+    assertEquals(1, requestHandler.deallocateCount);
+    assertEquals(1, requestHandler.allocateCount);
+    taskSchedulerService.stop();
+  }
+
+  static class MockLocalTaskSchedulerSerivce extends LocalTaskSchedulerService {
+
+    private MockAsyncDelegateRequestHandler requestHandler;
+
+    public MockLocalTaskSchedulerSerivce(TaskSchedulerAppCallback appClient,
+        ContainerSignatureMatcher containerSignatureMatcher,
+        String appHostName, int appHostPort, String appTrackingUrl,
+        AppContext appContext) {
+      super(appClient, containerSignatureMatcher, appHostName, appHostPort,
+          appTrackingUrl, appContext);
+    }
+
+    @Override
+    public AsyncDelegateRequestHandler createRequestHandler(Configuration conf) {
+      requestHandler = new MockAsyncDelegateRequestHandler(taskRequestQueue,
+          new LocalContainerFactory(appContext),
+          taskAllocations,
+          appClientDelegate,
+          conf);
+      return requestHandler;
+    }
+
+    @Override
+    public void serviceStart() {
+      // don't start RequestHandler thread, control it in unit test
+    }
+
+    public void startRequestHandlerThread() {
+      asyncDelegateRequestThread.start();
+    }
+
+    public MockAsyncDelegateRequestHandler getRequestHandler() {
+      return requestHandler;
+    }
+
+    static class MockAsyncDelegateRequestHandler extends AsyncDelegateRequestHandler {
+
+      public int allocateCount = 0;
+      public int deallocateCount = 0;
+      public int processedCount =0;
+
+      MockAsyncDelegateRequestHandler(
+          BlockingQueue<TaskRequest> taskRequestQueue,
+          LocalContainerFactory localContainerFactory,
+          HashMap<Object, Container> taskAllocations,
+          TaskSchedulerAppCallback appClientDelegate, Configuration conf) {
+        super(taskRequestQueue, localContainerFactory, taskAllocations,
+            appClientDelegate, conf);
+      }
+
+      @Override
+      void processRequest() {
+        super.processRequest();
+        processedCount ++;
+      }
+
+      public void drainRequest(int count) {
+        while(processedCount != count || !taskRequestQueue.isEmpty()) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+
+      @Override
+      void allocateTask(AllocateTaskRequest request) {
+        super.allocateTask(request);
+        allocateCount ++;
+      }
+
+      @Override
+      void deallocateTask(DeallocateTaskRequest request) {
+        super.deallocateTask(request);
+        deallocateCount ++;
+      }
+    }
+  }
 }