You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/06/17 00:12:52 UTC

svn commit: r1136709 - in /hadoop/common/branches/branch-0.20-security-204: ./ src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/test/system/java/org/apache/hadoo...

Author: omalley
Date: Thu Jun 16 22:12:51 2011
New Revision: 1136709

URL: http://svn.apache.org/viewvc?rev=1136709&view=rev
Log:
MAPREDUCE-2535. Fix NPE in JobClient caused by retirement. (Robert Joseph
Evans via cdouglas)

Added:
    hadoop/common/branches/branch-0.20-security-204/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java
      - copied unchanged from r1131286, hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java
Modified:
    hadoop/common/branches/branch-0.20-security-204/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-0.20-security-204/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java
    hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java
    hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskChildsKilling.java
    hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java
    hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java
    hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java

Modified: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/CHANGES.txt?rev=1136709&r1=1136708&r2=1136709&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security-204/CHANGES.txt Thu Jun 16 22:12:51 2011
@@ -11,6 +11,9 @@ Release 0.20.204.0 - unreleased
 
   BUG FIXES
 
+    MAPREDUCE-2535. Fix NPE in JobClient caused by retirement. (Robert Joseph
+    Evans via cdouglas)
+
     HDFS-2044. TestQueueProcessingStatistics failing automatic test due to 
     timing issues. (mattf)
 

Propchange: hadoop/common/branches/branch-0.20-security-204/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jun 16 22:12:51 2011
@@ -1,7 +1,7 @@
 /hadoop/common/branches/branch-0.20/CHANGES.txt:826138,826568,829987,831184,833001,880632,898713,909245,909723,960946,1044225
-/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1131290,1131299,1131737,1134140
+/hadoop/common/branches/branch-0.20-security/CHANGES.txt:1097202,1098837,1100336,1131286,1131290,1131299,1131737,1134140
 /hadoop/common/branches/branch-0.20-security-203/CHANGES.txt:1096071,1097012-1099333,1102071,1128115
-/hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1133133,1133274,1133282
+/hadoop/common/branches/branch-0.20-security-205/CHANGES.txt:1132788,1133133,1133274,1133282
 /hadoop/core/branches/branch-0.18/CHANGES.txt:727226
 /hadoop/core/branches/branch-0.19/CHANGES.txt:713112
 /hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755986,755998,756352,757448,757624,757849,758156,758180,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951,771607,772844,772876,772884,772920,773889,776638,778962,778966,779893,781720,784661,785046,785569

Modified: hadoop/common/branches/branch-0.20-security-204/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java?rev=1136709&r1=1136708&r2=1136709&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/contrib/streaming/src/test/system/org/apache/hadoop/mapred/TestTaskKillingOfStreamingJob.java Thu Jun 16 22:12:51 2011
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.mapreduce.test.system.JTProtocol;
 import org.apache.hadoop.mapreduce.test.system.JobInfo;
 import org.apache.hadoop.mapreduce.test.system.MRCluster;
@@ -94,7 +93,7 @@ public class TestTaskKillingOfStreamingJ
         jtClient.isTaskStarted(taskInfo));
 
     JobInfo jInfo = wovenClient.getJobInfo(jobId); 
-    NetworkedJob networkJob = client.new NetworkedJob(jInfo.getStatus());
+    RunningJob networkJob = client.getJob(jobId);
     TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
     TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
     networkJob.killTask(taskAttID, false);
@@ -153,7 +152,7 @@ public class TestTaskKillingOfStreamingJ
         jtClient.isTaskStarted(taskInfo));
     
     JobInfo jInfo = wovenClient.getJobInfo(jobId);
-    NetworkedJob networkJob = client.new NetworkedJob(jInfo.getStatus());
+    RunningJob networkJob = client.getJob(jobId);
     TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
     TaskAttemptID taskAttID = new TaskAttemptID(tID, 0);
     networkJob.killTask(taskAttID, true);

Modified: hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1136709&r1=1136708&r2=1136709&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/mapred/org/apache/hadoop/mapred/JobClient.java Thu Jun 16 22:12:51 2011
@@ -183,7 +183,8 @@ public class JobClient extends Configure
    * a JobProfile object to provide some info, and interacts with the
    * remote service to provide certain functionality.
    */
-  class NetworkedJob implements RunningJob {
+  static class NetworkedJob implements RunningJob {
+    private JobSubmissionProtocol jobSubmitClient;
     JobProfile profile;
     JobStatus status;
     long statustime;
@@ -191,16 +192,26 @@ public class JobClient extends Configure
     /**
      * We store a JobProfile and a timestamp for when we last
      * acquired the job profile.  If the job is null, then we cannot
-     * perform any of the tasks.  The job might be null if the JobTracker
-     * has completely forgotten about the job.  (eg, 24 hours after the
-     * job completes.)
+     * perform any of the tasks, so we throw an exception.
+     * The job might be null if the JobTracker has completely forgotten
+     * about the job.  (eg, 24 hours after the job completes.)
      */
-    public NetworkedJob(JobStatus job) throws IOException {
+    public NetworkedJob(JobStatus job, JobProfile prof, JobSubmissionProtocol jobSubmitClient) throws IOException {
       this.status = job;
-      this.profile = jobSubmitClient.getJobProfile(job.getJobID());
+      this.profile = prof;
+      this.jobSubmitClient = jobSubmitClient;
+      if(this.status == null) {
+        throw new IOException("The Job status cannot be null");
+      }
+      if(this.profile == null) {
+        throw new IOException("The Job profile cannot be null");
+      }
+      if(this.jobSubmitClient == null) {
+        throw new IOException("The Job Submission Protocol cannot be null");
+      }
       this.statustime = System.currentTimeMillis();
     }
-
+    
     /**
      * Some methods rely on having a recent job profile object.  Refresh
      * it, if necessary
@@ -217,6 +228,9 @@ public class JobClient extends Configure
      */
     synchronized void updateStatus() throws IOException {
       this.status = jobSubmitClient.getJobStatus(profile.getJobID());
+      if(this.status == null) {
+        throw new IOException("The job appears to have been removed."); 
+      }
       this.statustime = System.currentTimeMillis();
     }
 
@@ -863,8 +877,9 @@ public class JobClient extends Configure
           printTokens(jobId, jobCopy.getCredentials());
           status = jobSubmitClient.submitJob(
               jobId, submitJobDir.toString(), jobCopy.getCredentials());
-          if (status != null) {
-            return new NetworkedJob(status);
+          JobProfile prof = jobSubmitClient.getJobProfile(jobId);
+          if (status != null && prof != null) {
+            return new NetworkedJob(status, prof, jobSubmitClient);
           } else {
             throw new IOException("Could not launch job");
           }
@@ -1011,8 +1026,9 @@ public class JobClient extends Configure
    */
   public RunningJob getJob(JobID jobid) throws IOException {
     JobStatus status = jobSubmitClient.getJobStatus(jobid);
-    if (status != null) {
-      return new NetworkedJob(status);
+    JobProfile profile = jobSubmitClient.getJobProfile(jobid);
+    if (status != null && profile != null) {
+      return new NetworkedJob(status, profile, jobSubmitClient);
     } else {
       return null;
     }

Modified: hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java?rev=1136709&r1=1136708&r2=1136709&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestJobCacheDirectoriesCleanUp.java Thu Jun 16 22:12:51 2011
@@ -28,7 +28,6 @@ import org.apache.hadoop.mapreduce.test.
 import org.apache.hadoop.mapreduce.test.system.JTProtocol;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
 import org.apache.hadoop.mapreduce.test.system.JobInfo;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -189,8 +188,8 @@ public class TestJobCacheDirectoriesClea
           int MAX_MAP_TASK_ATTEMPTS = Integer.
                parseInt(jobConf.get("mapred.map.max.attempts"));
           while(taskinfo.numFailedAttempts() < MAX_MAP_TASK_ATTEMPTS) {
-            NetworkedJob networkJob = jtClient.getClient().
-               new NetworkedJob(jobInfo.getStatus());
+            org.apache.hadoop.mapreduce.JobID temp = jobInfo.getID();
+            RunningJob networkJob = client.getJob(new JobID(temp.getJtIdentifier(), temp.getId()));
             networkJob.killTask(taskAttID, true);
             taskinfo = rtClient.getTaskInfo(taskinfo.getTaskID());
             taskAttID = new TaskAttemptID(taskId, taskinfo.numFailedAttempts());

Modified: hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskChildsKilling.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskChildsKilling.java?rev=1136709&r1=1136708&r2=1136709&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskChildsKilling.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskChildsKilling.java Thu Jun 16 22:12:51 2011
@@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.test.
 import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
 import org.apache.hadoop.mapreduce.test.system.TaskInfo;
 import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.Tool;
 import org.junit.AfterClass;
@@ -271,8 +270,7 @@ public class TestTaskChildsKilling {
     Assert.assertTrue("Map process is not alive before task kills.", 
         ttIns.isProcessTreeAlive(pid));
 
-    NetworkedJob networkJob = client.new NetworkedJob(jInfo.getStatus());
-    networkJob.killTask(taskAttID, false);
+    runJob.killTask(taskAttID, false);
 
     LOG.info("Waiting till the task is killed...");
     taskInfo = wovenClient.getTaskInfo(taskInfo.getTaskID());

Modified: hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java?rev=1136709&r1=1136708&r2=1136709&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskKilling.java Thu Jun 16 22:12:51 2011
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapreduce.test.
 import org.apache.hadoop.mapreduce.test.system.TTClient;
 import org.apache.hadoop.mapreduce.test.system.JTClient;
 import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -100,7 +99,7 @@ public class TestTaskKilling {
         jtClient.isTaskStarted(taskInfo));
 
     // Fail the running task.
-    NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+    RunningJob networkJob = jobClient.getJob(jobId);
     TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
     TaskAttemptID taskAttID = new TaskAttemptID(tID , 0);
     networkJob.killTask(taskAttID, true);
@@ -206,7 +205,7 @@ public class TestTaskKilling {
             taskAttID + " has not been found while task was running.", 
                     isTempFolderExists);
     
-    NetworkedJob networkJob = jobClient.new NetworkedJob(jInfo.getStatus());
+    RunningJob networkJob = jobClient.getJob(id);
     networkJob.killTask(taskAttID, false);
     ttClient.getProxy().sendAction(action);
     taskInfo = remoteJTClient.getTaskInfo(tID);
@@ -353,8 +352,7 @@ public class TestTaskKilling {
         TaskAttemptID tAttID = new TaskAttemptID(taskId, 
             taskInfo.numFailedAttempts());
         while(taskInfo.numFailedAttempts() < MAX_MAP_TASK_ATTEMPTS) {
-          NetworkedJob networkJob = jtClient.getClient().
-             new NetworkedJob(jobInfo.getStatus());
+          RunningJob networkJob = jobClient.getJob(id);
           networkJob.killTask(taskAttID, true);
           taskInfo = remoteJTClient.getTaskInfo(taskInfo.getTaskID());
           taskAttID = new TaskAttemptID(taskId, taskInfo.numFailedAttempts());
@@ -484,8 +482,7 @@ public class TestTaskKilling {
             taskIdKilled = taskid.toString();
             taskAttemptID = new TaskAttemptID(taskid, i);
             LOG.info("taskAttemptid going to be killed is : " + taskAttemptID);
-            (jobClient.new NetworkedJob(jInfo.getStatus())).
-                killTask(taskAttemptID,true);
+            rJob.killTask(taskAttemptID,true);
             checkTaskCompletionEvent(taskAttemptID, jInfo);
             break;
           } else {
@@ -495,8 +492,7 @@ public class TestTaskKilling {
               UtilsForTests.waitFor(20000);
               LOG.info("taskAttemptid going to be killed is : " +
                   taskAttemptID);
-              (jobClient.new NetworkedJob(jInfo.getStatus())).
-                  killTask(taskAttemptID,true);
+              rJob.killTask(taskAttemptID,true);
               checkTaskCompletionEvent(taskAttemptID,jInfo);
               break;
             }
@@ -536,8 +532,10 @@ public class TestTaskKilling {
     boolean match = false;
     int count = 0;
     while (!match) {
-      TaskCompletionEvent[] taskCompletionEvents =  jobClient.new
-        NetworkedJob(jInfo.getStatus()).getTaskCompletionEvents(0);
+      org.apache.hadoop.mapreduce.JobID temp = jInfo.getID();
+      RunningJob rJob = jobClient.getJob(new JobID(temp.getJtIdentifier(), temp.getId()));
+ 
+      TaskCompletionEvent[] taskCompletionEvents =  rJob.getTaskCompletionEvents(0);
       for (TaskCompletionEvent taskCompletionEvent : taskCompletionEvents) {
         LOG.info("taskCompletionEvent.getTaskAttemptId().toString() is : " + 
           taskCompletionEvent.getTaskAttemptId().toString());

Modified: hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java?rev=1136709&r1=1136708&r2=1136709&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/mapred/TestTaskTrackerInfoSuccessfulFailedJobs.java Thu Jun 16 22:12:51 2011
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.test.
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.UtilsForTests;
-import org.apache.hadoop.mapred.JobClient.NetworkedJob;
 import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -340,11 +339,9 @@ public class TestTaskTrackerInfoSuccessf
     Assert.assertTrue("Task has not been started for 1 min.", 
       count != 60);
 
-    NetworkedJob networkJob = (cluster.getJTClient().getClient()).new 
-      NetworkedJob(jInfo.getStatus());
     TaskID tID = TaskID.downgrade(taskInfo.getTaskID());
     TaskAttemptID taskAttID = new TaskAttemptID(tID , 0);
-    networkJob.killTask(taskAttID, false);
+    rJob.killTask(taskAttID, false);
 
     count = 0;
     LOG.info("Waiting till the job is completed...");

Modified: hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java?rev=1136709&r1=1136708&r2=1136709&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-204/src/test/system/java/org/apache/hadoop/test/system/process/HadoopDaemonRemoteCluster.java Thu Jun 16 22:12:51 2011
@@ -124,9 +124,11 @@ public abstract class HadoopDaemonRemote
         }
       } finally {
         try {
-          reader.close();
+          if(reader != null) {
+              reader.close();
+          }
         } catch (IOException e) {
-          LOG.warn("Could not close reader");
+          LOG.warn("Could not close reader", e);
         }
       }
       LOG.info("Created HadoopDaemonInfo for " + cmd + " " + role + " from "