You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:46:21 UTC

svn commit: r1077704 - in /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred: DefaultTaskController.java LinuxTaskController.java TaskController.java TaskTracker.java

Author: omalley
Date: Fri Mar  4 04:46:20 2011
New Revision: 1077704

URL: http://svn.apache.org/viewvc?rev=1077704&view=rev
Log:
commit 251fe311a016404e3cb426e1b45aaeea9c71a448
Author: Devaraj Das <dd...@yahoo-inc.com>
Date:   Sun Sep 19 10:09:10 2010 -0700

    Passing the TT address to the TaskController's initializeJob to avoid a deadlock

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1077704&r1=1077703&r2=1077704&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Mar  4 04:46:20 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -163,7 +164,8 @@ public class DefaultTaskController exten
   @Override
   public void initializeJob(String user, String jobid, 
                             Path credentials, Path jobConf, 
-                            TaskUmbilicalProtocol taskTracker
+                            TaskUmbilicalProtocol taskTracker,
+                            InetSocketAddress ttAddr
                             ) throws IOException {
     final LocalDirAllocator lDirAlloc = allocator;
     FileSystem localFs = FileSystem.getLocal(getConf());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077704&r1=1077703&r2=1077704&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar  4 04:46:20 2011
@@ -142,7 +142,8 @@ class LinuxTaskController extends TaskCo
 
   @Override
   public void initializeJob(String user, String jobid, Path credentials,
-                            Path jobConf, TaskUmbilicalProtocol taskTracker
+                            Path jobConf, TaskUmbilicalProtocol taskTracker,
+                            InetSocketAddress ttAddr
                             ) throws IOException {
     List<String> command = new ArrayList<String>(
       Arrays.asList(taskControllerExe, 
@@ -162,8 +163,6 @@ class LinuxTaskController extends TaskCo
     command.add(user);
     command.add(jobid);
     // add the task tracker's reporting address
-    InetSocketAddress ttAddr = 
-      ((TaskTracker) taskTracker).getTaskTrackerReportAddress();
     command.add(ttAddr.getHostName());
     command.add(Integer.toString(ttAddr.getPort()));
     String[] commandArray = command.toArray(new String[0]);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077704&r1=1077703&r2=1077704&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar  4 04:46:20 2011
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -88,11 +89,13 @@ public abstract class TaskController imp
    * @param credentials a filename containing the job secrets
    * @param jobConf the path to the localized configuration file
    * @param taskTracker the connection the task tracker
+   * @param ttAddr the tasktracker's RPC address
    * @throws IOException
    */
   public abstract void initializeJob(String user, String jobid, 
                                      Path credentials, Path jobConf,
-                                     TaskUmbilicalProtocol taskTracker) 
+                                     TaskUmbilicalProtocol taskTracker,
+                                     InetSocketAddress ttAddr) 
   throws IOException;
   
   /**

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077704&r1=1077703&r2=1077704&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 04:46:20 2011
@@ -941,10 +941,11 @@ public class TaskTracker implements MRCo
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
+    InetSocketAddress ttAddr = getTaskTrackerReportAddress();
 
     synchronized (rjob) {
       if (!rjob.localized) {
-        Path localJobConfPath = initializeJob(t, rjob);
+        Path localJobConfPath = initializeJob(t, rjob, ttAddr);
         JobConf localJobConf = new JobConf(localJobConfPath);
         //to be doubly sure, overwrite the user in the config with the one the TT 
         //thinks it is
@@ -976,11 +977,13 @@ public class TaskTracker implements MRCo
    *
    * @param t task whose job has to be localized on this TT
    * @param rjob the {@link RunningJob}
+   * @param ttAddr the tasktracker's RPC address
    * @return the path to the job configuration to be used for all the tasks
    *         of this job as a starting point.
    * @throws IOException
    */
-  Path initializeJob(final Task t, final RunningJob rjob)
+  Path initializeJob(final Task t, final RunningJob rjob, 
+      final InetSocketAddress ttAddr)
   throws IOException, InterruptedException {
     final JobID jobId = t.getJobID();
 
@@ -1038,7 +1041,8 @@ public class TaskTracker implements MRCo
           // distributed cache manager makes as well)
           JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
           taskController.initializeJob(t.getUser(), jobId.toString(), 
-              new Path(localJobTokenFile), localJobFile, TaskTracker.this);
+              new Path(localJobTokenFile), localJobFile, TaskTracker.this,
+              ttAddr);
         } catch (IOException e) {
           LOG.warn("Exception while localization " + 
               StringUtils.stringifyException(e));