You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/05/02 20:05:51 UTC

tez git commit: TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master 727584fd2 -> c3b8b8523


TEZ-3193. Deadlock in AM during task commit request. (Jason Lowe via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/c3b8b852
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/c3b8b852
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/c3b8b852

Branch: refs/heads/master
Commit: c3b8b852331c41f9a9b41c7c74995f52f4578d99
Parents: 727584f
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 2 11:00:48 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon May 2 11:00:48 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 61 ++++++--------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 11 +++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 21 ++-----
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 13 +++--
 5 files changed, 43 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b436b9a..37d47cf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.9.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3193. Deadlock in AM during task commit request.
   TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
   TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput.
   TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests.
@@ -27,6 +28,7 @@ Release 0.8.4: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3193. Deadlock in AM during task commit request.
   TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
   TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
   TEZ-3224. User payload is not initialized before creating vertex manager plugin.
@@ -465,6 +467,7 @@ INCOMPATIBLE CHANGES
   TEZ-2949. Allow duplicate dag names within session for Tez.
 
 ALL CHANGES:
+  TEZ-3193. Deadlock in AM during task commit request.
   TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
   TEZ-3224. User payload is not initialized before creating vertex manager plugin.
   TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor initialization relative to Inputs/Outputs

http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6d9247e..e39315b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
@@ -71,7 +70,6 @@ import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.RecoveryParser.TaskAttemptRecoveryData;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
@@ -107,7 +105,6 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.recovery.records.RecoveryProtos.DataEventDependencyInfoProto;
-import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -190,8 +187,9 @@ public class TaskAttemptImpl implements TaskAttempt,
   private String nodeHttpAddress;
   private String nodeRackName;
   
-  private final Task task;
   private final Vertex vertex;
+  private final TaskLocationHint locationHint;
+  private final TaskSpec taskSpec;
 
   @VisibleForTesting
   boolean appendNextDataEvent = true;
@@ -465,22 +463,25 @@ public class TaskAttemptImpl implements TaskAttempt,
         .installTopology();
 
   @SuppressWarnings("rawtypes")
-  public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+  public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
       TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
-      Task task) {
-    this(taskId, attemptNumber, eventHandler, taskCommunicatorManagerInterface, conf, clock,
+      Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec) {
+    this(attemptId, eventHandler, taskCommunicatorManagerInterface, conf, clock,
         taskHeartbeatHandler, appContext, isRescheduled, resource, containerContext, leafVertex,
-        task, null);
+        vertex, locationHint, taskSpec, null);
   }
-  public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
+
+  @SuppressWarnings("rawtypes")
+  public TaskAttemptImpl(TezTaskAttemptID attemptId, EventHandler eventHandler,
       TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration conf, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
-      Task task, TezTaskAttemptID schedulingCausalTA) {
+      Vertex vertex, TaskLocationHint locationHint, TaskSpec taskSpec,
+      TezTaskAttemptID schedulingCausalTA) {
 
     MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
@@ -496,15 +497,16 @@ public class TaskAttemptImpl implements TaskAttempt,
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
-    this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+    this.attemptId = attemptId;
     this.eventHandler = eventHandler;
     //Reported status
     this.conf = conf;
     this.clock = clock;
     this.taskHeartbeatHandler = taskHeartbeatHandler;
     this.appContext = appContext;
-    this.task = task;
-    this.vertex = this.task.getVertex();
+    this.vertex = vertex;
+    this.locationHint = locationHint;
+    this.taskSpec = taskSpec;
     this.creationCausalTA = schedulingCausalTA;
     this.creationTime = clock.getTime();
 
@@ -548,14 +550,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     return creationCausalTA;
   }
 
-  TaskSpec createRemoteTaskSpec() throws AMUserCodeException {
-    TaskSpec baseTaskSpec = task.getBaseTaskSpec();
-    return new TaskSpec(getID(),
-        baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
-        baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
-        baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
-  }
-
   @Override
   public TaskAttemptReport getReport() {
     TaskAttemptReport result = Records.newRecord(TaskAttemptReport.class);
@@ -1036,7 +1030,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
   
   private TaskLocationHint getTaskLocationHint() {
-    return task.getTaskLocationHint();
+    return locationHint;
   }
 
   protected String[] resolveHosts(String[] src) {
@@ -1226,22 +1220,6 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
       ta.scheduledTime = ta.clock.getTime();
 
-      // Create the remote task.
-      TaskSpec remoteTaskSpec;
-      try {
-        remoteTaskSpec = ta.createRemoteTaskSpec();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("remoteTaskSpec:" + remoteTaskSpec);
-        }
-      } catch (AMUserCodeException e) {
-        String msg = "Exception in " + e.getSource() + ", taskAttempt=" + ta;
-        LOG.error(msg, e);
-        String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
-        new TerminateTransition(FAILED_HELPER).transition(ta,
-            new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, diag,
-                TaskAttemptTerminationCause.APPLICATION_ERROR));
-        return TaskAttemptStateInternal.FAILED;
-      }
       // Create startTaskRequest
 
       String[] requestHosts = new String[0];
@@ -1271,10 +1249,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         locationHint = null;
       }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Asking for container launch with taskAttemptContext: "
-            + remoteTaskSpec);
-      }
+      LOG.debug("Asking for container launch with taskAttemptContext: {}", ta.taskSpec);
       
       // Send out a launch request to the scheduler.
       int priority;
@@ -1288,7 +1263,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       // TODO Jira post TEZ-2003 getVertex implementation is very inefficient. This should be via references, instead of locked table lookups.
       Vertex vertex = ta.getVertex();
       AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
-          ta.attemptId, ta.taskResource, remoteTaskSpec, ta, locationHint,
+          ta.attemptId, ta.taskResource, ta.taskSpec, ta, locationHint,
           priority, ta.containerContext,
           vertex.getTaskSchedulerIdentifier(),
           vertex.getContainerLauncherIdentifier(),

http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 26ba004..28a1c5e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -93,6 +93,7 @@ import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TaskStatistics;
@@ -718,9 +719,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedulingCausalTA) {
-    return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
+    TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
+    TaskSpec taskSpec = new TaskSpec(attemptId,
+        baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
+        baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
+        baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+    return new TaskAttemptImpl(attemptId, eventHandler,
         taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
-        (failedAttempts > 0), taskResource, containerContext, leafVertex, this, schedulingCausalTA);
+        (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),
+        locationHint, taskSpec, schedulingCausalTA);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index e4cd956..a50ca49 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -78,7 +78,6 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
-import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -112,10 +111,10 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.serviceplugins.api.ServicePluginException;
 import org.junit.Assert;
@@ -139,7 +138,6 @@ public class TestTaskAttempt {
   }
   
   AppContext appCtx;
-  Task mockTask;
   TaskLocationHint locationHint;
   Vertex mockVertex;
   ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
@@ -156,9 +154,7 @@ public class TestTaskAttempt {
     when(appCtx.getContainerLauncherName(anyInt())).thenReturn(
         TezConstants.getTezYarnServicePluginName());
 
-    mockTask = mock(Task.class);
     mockVertex = mock(Vertex.class);
-    when(mockTask.getVertex()).thenReturn(mockVertex);
     when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
 
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
@@ -196,7 +192,6 @@ public class TestTaskAttempt {
           + AMSchedulerEventTALaunchRequest.class.getName());
     }
     
-    verify(mockTask, times(1)).getTaskLocationHint();
     // TODO Move the Rack request check to the client after TEZ-125 is fixed.
     Set<String> requestedRacks = taImpl.taskRacks;
     assertEquals(1, requestedRacks.size());
@@ -1742,12 +1737,12 @@ public class TestTaskAttempt {
         TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
         boolean isRescheduled,
         Resource resource, ContainerContext containerContext, boolean leafVertex) {
-      super(taskId, attemptNumber, eventHandler, tal, conf,
+      super(TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber),
+          eventHandler, tal, conf,
           clock, taskHeartbeatHandler, appContext,
-          isRescheduled, resource, containerContext, leafVertex, mockTask, null);
-      when(mockTask.getTaskLocationHint()).thenReturn(locationHint);
+          isRescheduled, resource, containerContext, leafVertex, mockVertex,
+          locationHint, null, null);
     }
-
     
     boolean inputFailedReported = false;
     
@@ -1757,12 +1752,6 @@ public class TestTaskAttempt {
     }
 
     @Override
-    protected TaskSpec createRemoteTaskSpec() {
-      // FIXME
-      return null;
-    }
-
-    @Override
     protected void logJobHistoryAttemptStarted() {
       taskAttemptStartedEventLogged++;
       super.logJobHistoryAttemptStarted();

http://git-wip-us.apache.org/repos/asf/tez/blob/c3b8b852/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index f88ab7c..fb2f543 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -82,6 +82,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
 import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -948,8 +949,9 @@ public class TestTaskImpl {
 
     @Override
     protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
-      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
-          attemptNumber, eventHandler, taskCommunicatorManagerInterface,
+      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(
+          TezBuilderUtils.newTaskAttemptId(getTaskId(), attemptNumber),
+          eventHandler, taskCommunicatorManagerInterface,
           conf, clock, taskHeartbeatHandler, appContext,
           true, taskResource, containerContext, schedCausalTA);
       taskAttempts.add(attempt);
@@ -995,13 +997,14 @@ public class TestTaskImpl {
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
 
-    public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
+    public MockTaskAttemptImpl(TezTaskAttemptID attemptId,
         EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf,
         Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
         boolean isRescheduled,
         Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {
-      super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
-          appContext, isRescheduled, resource, containerContext, false, mock(TaskImpl.class), schedCausalTA);
+      super(attemptId, eventHandler, tal, conf, clock, thh,
+          appContext, isRescheduled, resource, containerContext, false, null,
+          locationHint, mockTaskSpec, schedCausalTA);
     }
     
     @Override