You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/03/28 07:03:41 UTC

svn commit: r1086119 - in /hadoop/mapreduce/branches/MR-279/mr-client: hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/ hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-ma...

Author: sharad
Date: Mon Mar 28 05:03:41 2011
New Revision: 1086119

URL: http://svn.apache.org/viewvc?rev=1086119&view=rev
Log:
Implement fail-task api. Contributed by Sharad Agarwal.

Modified:
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1086119&r1=1086118&r2=1086119&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Mon Mar 28 05:03:41 2011
@@ -208,6 +208,7 @@ public class MRClientService extends Abs
 
     @Override
     public Void killJob(JobID jobID) throws AvroRemoteException {
+      LOG.info("Kill Job received from client " + jobID);
       verifyAndGetJob(jobID);
       appContext.getEventHandler().handle(
           new JobEvent(jobID, JobEventType.JOB_KILL));
@@ -216,6 +217,7 @@ public class MRClientService extends Abs
 
     @Override
     public Void killTask(TaskID taskID) throws AvroRemoteException {
+      LOG.info("Kill task received from client " + taskID);
       verifyAndGetTask(taskID);
       appContext.getEventHandler().handle(
           new TaskEvent(taskID, TaskEventType.T_KILL));
@@ -225,6 +227,7 @@ public class MRClientService extends Abs
     @Override
     public Void killTaskAttempt(TaskAttemptID taskAttemptID)
         throws AvroRemoteException {
+      LOG.info("Kill task attempt received from client " + taskAttemptID);
       verifyAndGetAttempt(taskAttemptID);
       appContext.getEventHandler().handle(
           new TaskAttemptEvent(taskAttemptID, 
@@ -233,6 +236,17 @@ public class MRClientService extends Abs
     }
 
     @Override
+    public Void failTaskAttempt(TaskAttemptID taskAttemptID)
+        throws AvroRemoteException {
+      LOG.info("Fail task attempt received from client " + taskAttemptID);
+      verifyAndGetAttempt(taskAttemptID);
+      appContext.getEventHandler().handle(
+          new TaskAttemptEvent(taskAttemptID, 
+              TaskAttemptEventType.TA_FAILMSG));
+      return null;
+    }
+
+    @Override
     public List<CharSequence> getDiagnostics(TaskAttemptID taskAttemptID)
         throws AvroRemoteException {
       return verifyAndGetAttempt(taskAttemptID).getDiagnostics();

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1086119&r1=1086118&r2=1086119&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Mon Mar 28 05:03:41 2011
@@ -153,6 +153,8 @@ public abstract class TaskAttemptImpl im
          TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition())
      .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED,
          TaskAttemptEventType.TA_KILL, new KilledTransition())
+     .addTransition(TaskAttemptState.NEW, TaskAttemptState.FAILED,
+         TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
 
      // Transitions from the UNASSIGNED state.
      .addTransition(TaskAttemptState.UNASSIGNED,
@@ -161,6 +163,9 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED,
          TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
              TaskAttemptState.KILLED, true))
+     .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.FAILED,
+         TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
+             TaskAttemptState.FAILED, true))
 
      // Transitions from the ASSIGNED state.
      .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING,
@@ -172,6 +177,9 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptState.ASSIGNED, 
          TaskAttemptState.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptState.ASSIGNED, 
+         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
 
      // Transitions from RUNNING state.
      .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
@@ -239,7 +247,7 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
          TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
-             TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_TIMED_OUT,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
@@ -255,6 +263,7 @@ public abstract class TaskAttemptImpl im
              TaskAttemptEventType.TA_UPDATE,
              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
              TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_TIMED_OUT))
@@ -317,14 +326,17 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptState.SUCCEEDED,
          TaskAttemptState.SUCCEEDED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
      // Ignore-able events for FAILED state
      .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
          EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_ASSIGNED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED,
              TaskAttemptEventType.TA_UPDATE,
              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+             TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
              TaskAttemptEventType.TA_COMMIT_PENDING,
              TaskAttemptEventType.TA_DONE,
              TaskAttemptEventType.TA_FAILMSG))

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro?rev=1086119&r1=1086118&r2=1086119&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro Mon Mar 28 05:03:41 2011
@@ -147,5 +147,6 @@ protocol MRClientProtocol {
   void killJob(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
   void killTask(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException;
   void killTaskAttempt(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
+  void failTaskAttempt(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
 
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1086119&r1=1086118&r2=1086119&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Mon Mar 28 05:03:41 2011
@@ -172,6 +172,13 @@ public class HistoryClientService extend
     }
 
     @Override
+    public Void failTaskAttempt(TaskAttemptID taskAttemptID)
+        throws AvroRemoteException {
+      getJob(taskAttemptID.taskID.jobID);
+      throw RPCUtil.getRemoteException("Invalid operation on completed job");
+    }
+
+    @Override
     public List<CharSequence> getDiagnostics(TaskAttemptID taskAttemptID)
         throws AvroRemoteException {
       Job job = getJob(taskAttemptID.taskID.jobID);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1086119&r1=1086118&r2=1086119&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Mon Mar 28 05:03:41 2011
@@ -265,19 +265,27 @@ public class ClientServiceDelegate {
     return null;
   }
 
-  public boolean killTask(TaskAttemptID taskAttemptID, boolean killed)
+  public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
       throws YarnRemoteException, AvroRemoteException {
     org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID 
       = TypeConverter.toYarn(taskAttemptID);
     try {
-      getProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+      if (fail) {
+        getProxy(taskAttemptID.getJobID()).failTaskAttempt(attemptID);
+      } else {
+        getProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+      }
     } catch(YarnRemoteException yre) {//thrown by remote server, no need to redirect
       LOG.warn(RPCUtil.toString(yre));
       throw yre;
     } catch(Exception e) {
       LOG.debug("Failed to contact application master ", e);
       try {
-        getRefreshedProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+        if (fail) {
+          getRefreshedProxy(taskAttemptID.getJobID()).failTaskAttempt(attemptID);
+        } else {
+          getRefreshedProxy(taskAttemptID.getJobID()).killTaskAttempt(attemptID);
+        }
       } catch(YarnRemoteException yre) {
         LOG.warn(RPCUtil.toString(yre));
         throw yre;

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1086119&r1=1086118&r2=1086119&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Mon Mar 28 05:03:41 2011
@@ -306,5 +306,12 @@ public class TestClientRedirect {
         throws AvroRemoteException, YarnRemoteException {
       return null;
     }
+
+    @Override
+    public Void failTaskAttempt(
+        org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID)
+        throws AvroRemoteException, YarnRemoteException {
+      return null;
+    }
   }
 }