You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/02/07 09:01:06 UTC

svn commit: r907393 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/Child.java src/java/org/apache/hadoop/mapred/JobTracker.java src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java

Author: ddas
Date: Sun Feb  7 08:01:06 2010
New Revision: 907393

URL: http://svn.apache.org/viewvc?rev=907393&view=rev
Log:
MAPREDUCE-1457. Fixes JobTracker to get the FileSystem object within getStagingAreaDir within a privileged block. Fixes Child.java to use the  appropriate UGIs while getting the TaskUmbilicalProtocol proxy and while executing the task. Contributed by Jakob Homan.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=907393&r1=907392&r2=907393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Feb  7 08:01:06 2010
@@ -301,6 +301,11 @@
     MAPREDUCE-1443. DBInputFormat can leak connections.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-1457. Fixes JobTracker to get the FileSystem object within 
+    getStagingAreaDir within a privileged block. Fixes Child.java to use the
+    appropriate UGIs while getting the TaskUmbilicalProtocol proxy and 
+    while executing the task. (Jakob Homan via ddas)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=907393&r1=907392&r2=907393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Sun Feb  7 08:01:06 2010
@@ -23,6 +23,7 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -60,13 +61,13 @@
   public static void main(String[] args) throws Throwable {
     LOG.debug("Child starting");
 
-    JobConf defaultConf = new JobConf();
+    final JobConf defaultConf = new JobConf();
     // set tcp nodelay
     defaultConf.setBoolean("ipc.client.tcpnodelay", true);
     
     String host = args[0];
     int port = Integer.parseInt(args[1]);
-    InetSocketAddress address = new InetSocketAddress(host, port);
+    final InetSocketAddress address = new InetSocketAddress(host, port);
     final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
     final int SLEEP_LONGER_COUNT = 5;
     int jvmIdInt = Integer.parseInt(args[3]);
@@ -86,11 +87,22 @@
     UserGroupInformation current = UserGroupInformation.getCurrentUser();
     current.addToken(jt);
     
-    TaskUmbilicalProtocol umbilical =
-      (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
-          TaskUmbilicalProtocol.versionID,
-          address,
-          defaultConf);
+    // Create TaskUmbilicalProtocol as actual task owner.
+    UserGroupInformation taskOwner 
+     = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+    taskOwner.addToken(jt);
+    
+    final TaskUmbilicalProtocol umbilical = 
+      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+      @Override
+      public TaskUmbilicalProtocol run() throws Exception {
+        return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+            TaskUmbilicalProtocol.versionID,
+            address,
+            defaultConf);
+      }
+    });
+    
     int numTasksToExecute = -1; //-1 signifies "no limit"
     int numTasksExecuted = 0;
     Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -132,6 +144,9 @@
     JvmContext context = new JvmContext(jvmId, pid);
     int idleLoopCount = 0;
     Task task = null;
+    
+    UserGroupInformation childUGI = null;
+    
     try {
       while (true) {
         taskid = null;
@@ -161,7 +176,7 @@
         //create the index file so that the log files 
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
-        JobConf job = new JobConf(task.getJobFile());
+        final JobConf job = new JobConf(task.getJobFile());
         
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.
@@ -186,11 +201,28 @@
         JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
         // use job-specified working directory
         FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
-        try {
-          task.run(job, umbilical);             // run the task
-        } finally {
-          TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+        LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
+        childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
+        // Add tokens to new user so that it may execute its task correctly.
+        for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
+          childUGI.addToken(token);
         }
+        
+        // Create a final reference to the task for the doAs block
+        final Task taskFinal = task;
+        childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+          @Override
+          public Object run() throws Exception {
+            try {
+              taskFinal.run(job, umbilical);             // run the task
+            } finally {
+              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+            }
+            
+            return null;
+          }
+        });
+
         if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
           break;
         }
@@ -204,7 +236,18 @@
       try {
         if (task != null) {
           // do cleanup for the task
-          task.taskCleanup(umbilical);
+          if(childUGI == null) { // no need to job into doAs block
+            task.taskCleanup(umbilical);
+          } else {
+            final Task taskFinal = task;
+            childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                taskFinal.taskCleanup(umbilical);
+                return null;
+              }
+            });
+          }
         }
       } catch (Exception e) {
         LOG.info("Exception cleaning up : " + StringUtils.stringifyException(e));

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=907393&r1=907392&r2=907393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sun Feb  7 08:01:06 2010
@@ -3662,12 +3662,21 @@
    * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {
-    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
-        "/tmp/hadoop/mapred/staging"));
-    FileSystem fs = stagingRootDir.getFileSystem(conf);
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-    return fs.makeQualified(new Path(stagingRootDir, 
-                                user+"/.staging")).toString();
+    try {
+      final String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+        @Override
+        public String run() throws Exception {
+          Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
+          "/tmp/hadoop/mapred/staging"));
+          FileSystem fs = stagingRootDir.getFileSystem(conf);
+          return fs.makeQualified(new Path(stagingRootDir, 
+                                      user+"/.staging")).toString();
+      }
+      });
+    } catch(InterruptedException ie) {
+      throw new IOException(ie);
+    }
   }
   
   /**
@@ -4373,13 +4382,15 @@
     FileSystem historyFS = null;
 
     jobHistory = new JobHistory();
-    jobHistory.init(this, conf, this.localMachine, this.startTime);
-    jobHistory.initDone(conf, fs);
-    final String historyLogDir = jobHistory.getCompletedJobHistoryLocation().toString();
-    infoServer.setAttribute("historyLogDir", historyLogDir);
+    final JobTracker jtFinal = this;
     try {
       historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
         public FileSystem run() throws IOException {
+          jobHistory.init(jtFinal, conf, jtFinal.localMachine, jtFinal.startTime);
+          jobHistory.initDone(conf, fs);
+          final String historyLogDir = 
+            jobHistory.getCompletedJobHistoryLocation().toString();
+          infoServer.setAttribute("historyLogDir", historyLogDir);
           return new Path(historyLogDir).getFileSystem(conf);
         }
       });

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=907393&r1=907392&r2=907393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Sun Feb  7 08:01:06 2010
@@ -108,13 +108,13 @@
       long jobTrackerStartTime) throws IOException {
 
     // Get and create the log folder
-    String logDirLoc = conf.get(JTConfig.JT_JOBHISTORY_LOCATION ,
+    final String logDirLoc = conf.get(JTConfig.JT_JOBHISTORY_LOCATION ,
         "file:///" +
         new File(System.getProperty("hadoop.log.dir")).getAbsolutePath()
         + File.separator + "history");
 
     LOG.info("History log directory is " + logDirLoc);
-
+    
     logDir = new Path(logDirLoc);
     logDirFs = logDir.getFileSystem(conf);