You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/07/16 17:06:42 UTC

svn commit: r1503747 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/ma...

Author: jlowe
Date: Tue Jul 16 15:06:42 2013
New Revision: 1503747

URL: http://svn.apache.org/r1503747
Log:
svn merge -c 1503744 to revert MAPREDUCE-5317.  Stale files left behind for failed jobs

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1503747&r1=1503746&r2=1503747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Jul 16 15:06:42 2013
@@ -21,9 +21,6 @@ Release 2.3.0 - UNRELEASED
     MAPREDUCE-5358. MRAppMaster throws invalid transitions for JobImpl
     (Devaraj K via jlowe)
 
-    MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
-    jlowe)
-
 Release 2.2.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -1098,9 +1095,6 @@ Release 0.23.10 - UNRELEASED
     MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
     input path dir (Devaraj K via jlowe)
 
-    MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
-    jlowe)
-
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1503747&r1=1503746&r2=1503747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Tue Jul 16 15:06:42 2013
@@ -25,7 +25,6 @@ public enum JobStateInternal {
   RUNNING,
   COMMITTING,
   SUCCEEDED,
-  FAIL_WAIT,
   FAIL_ABORT,
   FAILED,
   KILL_WAIT,

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1503747&r1=1503746&r2=1503747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Tue Jul 16 15:06:42 2013
@@ -44,7 +44,6 @@ public enum JobEventType {
 
   //Producer:Job
   JOB_COMPLETED,
-  JOB_FAIL_WAIT_TIMEDOUT,
 
   //Producer:Any component
   JOB_DIAGNOSTIC_UPDATE,

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1503747&r1=1503746&r2=1503747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Jul 16 15:06:42 2013
@@ -30,9 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -316,8 +313,7 @@ public class JobImpl implements org.apac
           .addTransition
               (JobStateInternal.RUNNING,
               EnumSet.of(JobStateInternal.RUNNING,
-                  JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT,
-                  JobStateInternal.FAIL_ABORT),
+                  JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT),
               JobEventType.JOB_TASK_COMPLETED,
               new TaskCompletedTransition())
           .addTransition
@@ -428,37 +424,7 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED))
 
-          // Transitions from FAIL_WAIT state
-          .addTransition(JobStateInternal.FAIL_WAIT,
-              JobStateInternal.FAIL_WAIT,
-              JobEventType.JOB_DIAGNOSTIC_UPDATE,
-              DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(JobStateInternal.FAIL_WAIT,
-              JobStateInternal.FAIL_WAIT,
-              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
-          .addTransition(JobStateInternal.FAIL_WAIT,
-              EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT),
-              JobEventType.JOB_TASK_COMPLETED, 
-              new JobFailWaitTransition())
-          .addTransition(JobStateInternal.FAIL_WAIT,
-              JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT, 
-              new JobFailWaitTimedOutTransition())
-          .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED,
-              JobEventType.JOB_KILL,
-              new KilledDuringAbortTransition())
-          .addTransition(JobStateInternal.FAIL_WAIT,
-              JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
-              INTERNAL_ERROR_TRANSITION)
-          // Ignore-able events
-          .addTransition(JobStateInternal.FAIL_WAIT,
-              JobStateInternal.FAIL_WAIT,
-              EnumSet.of(JobEventType.JOB_UPDATED_NODES,
-                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
-                  JobEventType.JOB_MAP_TASK_RESCHEDULED,
-                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
-                  JobEventType.JOB_AM_REBOOT))
-
-          //Transitions from FAIL_ABORT state
+          // Transitions from FAIL_ABORT state
           .addTransition(JobStateInternal.FAIL_ABORT,
               JobStateInternal.FAIL_ABORT,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -485,8 +451,7 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.JOB_COMMIT_COMPLETED,
                   JobEventType.JOB_COMMIT_FAILED,
-                  JobEventType.JOB_AM_REBOOT,
-                  JobEventType.JOB_FAIL_WAIT_TIMEDOUT))
+                  JobEventType.JOB_AM_REBOOT))
 
           // Transitions from KILL_ABORT state
           .addTransition(JobStateInternal.KILL_ABORT,
@@ -637,10 +602,6 @@ public class JobImpl implements org.apac
   
   private JobStateInternal forcedState = null;
 
-  //Executor used for running future tasks. Setting thread pool size to 1
-  private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
-  private ScheduledFuture failWaitTriggerScheduledFuture;
-
   public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
       Configuration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
@@ -1001,7 +962,6 @@ public class JobImpl implements org.apac
     case SETUP:
     case COMMITTING:
       return JobState.RUNNING;
-    case FAIL_WAIT:
     case FAIL_ABORT:
       return JobState.FAILED;
     case REBOOT:
@@ -1605,43 +1565,7 @@ public class JobImpl implements org.apac
       job.unsuccessfulFinish(finalState);
     }
   }
-
-  //This transition happens when a job is to be failed. It waits for all the
-  //tasks to finish / be killed.
-  private static class JobFailWaitTransition
-  implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
-    @Override
-    public JobStateInternal transition(JobImpl job, JobEvent event) {
-      if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
-        for(Task task: job.tasks.values()) {
-          if(!task.isFinished()) {
-            return JobStateInternal.FAIL_WAIT;
-          }
-        }
-      }
-      //Finished waiting. All tasks finished / were killed
-      job.failWaitTriggerScheduledFuture.cancel(false);
-      job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
-        job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
-      return JobStateInternal.FAIL_ABORT;
-    }
-  }
-
-  //This transition happens when a job to be failed times out while waiting on
-  //tasks that had been sent the KILL signal. It is triggered by a
-  //ScheduledFuture task queued in the executor.
-  private static class JobFailWaitTimedOutTransition
-  implements SingleArcTransition<JobImpl, JobEvent> {
-    @Override
-    public void transition(JobImpl job, JobEvent event) {
-      LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
-        + " Going to fail job anyway");
-      job.failWaitTriggerScheduledFuture.cancel(false);
-      job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
-        job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
-    }
-  }
-
+    
   // JobFinishedEvent triggers the move of the history file out of the staging
   // area. May need to create a new event type for this if JobFinished should 
   // not be generated for KilledJobs, etc.
@@ -1874,23 +1798,6 @@ public class JobImpl implements org.apac
       return checkJobAfterTaskCompletion(job);
     }
 
-    //This class is used to queue a ScheduledFuture to send an event to a job
-    //after some delay. This can be used to wait for maximum amount of time
-    //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for
-    //all tasks to be killed.
-    static class TriggerScheduledFuture implements Runnable {
-      JobEvent toSend;
-      JobImpl job;
-      TriggerScheduledFuture(JobImpl job, JobEvent toSend) {
-        this.toSend = toSend;
-        this.job = job;
-      }
-      public void run() {
-        LOG.info("Sending event " + toSend + " to " + job.getID());
-        job.getEventHandler().handle(toSend);
-      }
-    }
-
     protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
       //check for Job failure
       if (job.failedMapTaskCount*100 > 
@@ -1904,30 +1811,10 @@ public class JobImpl implements org.apac
             " failedReduces:" + job.failedReduceTaskCount;
         LOG.info(diagnosticMsg);
         job.addDiagnostic(diagnosticMsg);
-
-        //Send kill signal to all unfinished tasks here.
-        boolean allDone = true;
-        for (Task task : job.tasks.values()) {
-          if(!task.isFinished()) {
-            allDone = false;
-            job.eventHandler.handle(
-              new TaskEvent(task.getID(), TaskEventType.T_KILL));
-          }
-        }
-
-        //If all tasks are already done, we should go directly to FAIL_ABORT
-        if(allDone) {
-          return JobStateInternal.FAIL_ABORT;
-        }
-
-        //Set max timeout to wait for the tasks to get killed
-        job.failWaitTriggerScheduledFuture = job.executor.schedule(
-          new TriggerScheduledFuture(job, new JobEvent(job.getID(),
-            JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt(
-                MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
-                MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS),
-                TimeUnit.MILLISECONDS);
-        return JobStateInternal.FAIL_WAIT;
+        job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+            job.jobContext,
+            org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+        return JobStateInternal.FAIL_ABORT;
       }
       
       return job.checkReadyForCommit();

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1503747&r1=1503746&r2=1503747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Jul 16 15:06:42 2013
@@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -85,7 +84,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 
 /**
@@ -334,39 +332,6 @@ public class TestJobImpl {
     commitHandler.stop();
   }
 
-  @Test
-  public void testAbortJobCalledAfterKillingTasks() throws IOException,
-    InterruptedException {
-    Configuration conf = new Configuration();
-    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
-    conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
-    InlineDispatcher dispatcher = new InlineDispatcher();
-    dispatcher.init(conf);
-    dispatcher.start();
-    OutputCommitter committer = Mockito.mock(OutputCommitter.class);
-    CommitterEventHandler commitHandler =
-        createCommitterEventHandler(dispatcher, committer);
-    commitHandler.init(conf);
-    commitHandler.start();
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
-
-    //Fail one task. This should land the JobImpl in the FAIL_WAIT state
-    job.handle(new JobTaskEvent(
-      MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
-      TaskState.FAILED));
-    //Verify abort job hasn't been called
-    Mockito.verify(committer, Mockito.never())
-      .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
-    assertJobState(job, JobStateInternal.FAIL_WAIT);
-
-    //Verify abortJob is called once and the job failed
-    Mockito.verify(committer, Mockito.timeout(2000).times(1))
-      .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
-    assertJobState(job, JobStateInternal.FAILED);
-
-    dispatcher.stop();
-  }
-
   @Test(timeout=20000)
   public void testKilledDuringFailAbort() throws Exception {
     Configuration conf = new Configuration();