You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2010/02/07 09:01:06 UTC
svn commit: r907393 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/Child.java
src/java/org/apache/hadoop/mapred/JobTracker.java
src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
Author: ddas
Date: Sun Feb 7 08:01:06 2010
New Revision: 907393
URL: http://svn.apache.org/viewvc?rev=907393&view=rev
Log:
MAPREDUCE-1457. Fixes JobTracker to get the FileSystem object within getStagingAreaDir within a privileged block. Fixes Child.java to use the appropriate UGIs while getting the TaskUmbilicalProtocol proxy and while executing the task. Contributed by Jakob Homan.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=907393&r1=907392&r2=907393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sun Feb 7 08:01:06 2010
@@ -301,6 +301,11 @@
MAPREDUCE-1443. DBInputFormat can leak connections.
(Aaron Kimball via tomwhite)
+ MAPREDUCE-1457. Fixes JobTracker to get the FileSystem object within
+ getStagingAreaDir within a privileged block. Fixes Child.java to use the
+ appropriate UGIs while getting the TaskUmbilicalProtocol proxy and
+ while executing the task. (Jakob Homan via ddas)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=907393&r1=907392&r2=907393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Sun Feb 7 08:01:06 2010
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -60,13 +61,13 @@
public static void main(String[] args) throws Throwable {
LOG.debug("Child starting");
- JobConf defaultConf = new JobConf();
+ final JobConf defaultConf = new JobConf();
// set tcp nodelay
defaultConf.setBoolean("ipc.client.tcpnodelay", true);
String host = args[0];
int port = Integer.parseInt(args[1]);
- InetSocketAddress address = new InetSocketAddress(host, port);
+ final InetSocketAddress address = new InetSocketAddress(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
final int SLEEP_LONGER_COUNT = 5;
int jvmIdInt = Integer.parseInt(args[3]);
@@ -86,11 +87,22 @@
UserGroupInformation current = UserGroupInformation.getCurrentUser();
current.addToken(jt);
- TaskUmbilicalProtocol umbilical =
- (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
- TaskUmbilicalProtocol.versionID,
- address,
- defaultConf);
+ // Create TaskUmbilicalProtocol as actual task owner.
+ UserGroupInformation taskOwner
+ = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());
+ taskOwner.addToken(jt);
+
+ final TaskUmbilicalProtocol umbilical =
+ taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
+ @Override
+ public TaskUmbilicalProtocol run() throws Exception {
+ return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+ TaskUmbilicalProtocol.versionID,
+ address,
+ defaultConf);
+ }
+ });
+
int numTasksToExecute = -1; //-1 signifies "no limit"
int numTasksExecuted = 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -132,6 +144,9 @@
JvmContext context = new JvmContext(jvmId, pid);
int idleLoopCount = 0;
Task task = null;
+
+ UserGroupInformation childUGI = null;
+
try {
while (true) {
taskid = null;
@@ -161,7 +176,7 @@
//create the index file so that the log files
//are viewable immediately
TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
- JobConf job = new JobConf(task.getJobFile());
+ final JobConf job = new JobConf(task.getJobFile());
// set the jobTokenFile into task
task.setJobTokenSecret(JobTokenSecretManager.
@@ -186,11 +201,28 @@
JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
// use job-specified working directory
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
- try {
- task.run(job, umbilical); // run the task
- } finally {
- TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ LOG.debug("Creating remote user to execute task: " + job.get("user.name"));
+ childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));
+ // Add tokens to new user so that it may execute its task correctly.
+ for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {
+ childUGI.addToken(token);
}
+
+ // Create a final reference to the task for the doAs block
+ final Task taskFinal = task;
+ childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ try {
+ taskFinal.run(job, umbilical); // run the task
+ } finally {
+ TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
+ }
+
+ return null;
+ }
+ });
+
if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
break;
}
@@ -204,7 +236,18 @@
try {
if (task != null) {
// do cleanup for the task
- task.taskCleanup(umbilical);
+ if(childUGI == null) { // no need to job into doAs block
+ task.taskCleanup(umbilical);
+ } else {
+ final Task taskFinal = task;
+ childUGI.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ taskFinal.taskCleanup(umbilical);
+ return null;
+ }
+ });
+ }
}
} catch (Exception e) {
LOG.info("Exception cleaning up : " + StringUtils.stringifyException(e));
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=907393&r1=907392&r2=907393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Sun Feb 7 08:01:06 2010
@@ -3662,12 +3662,21 @@
* @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
*/
public String getStagingAreaDir() throws IOException {
- Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
- "/tmp/hadoop/mapred/staging"));
- FileSystem fs = stagingRootDir.getFileSystem(conf);
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
- return fs.makeQualified(new Path(stagingRootDir,
- user+"/.staging")).toString();
+ try {
+ final String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ return mrOwner.doAs(new PrivilegedExceptionAction<String>() {
+ @Override
+ public String run() throws Exception {
+ Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT,
+ "/tmp/hadoop/mapred/staging"));
+ FileSystem fs = stagingRootDir.getFileSystem(conf);
+ return fs.makeQualified(new Path(stagingRootDir,
+ user+"/.staging")).toString();
+ }
+ });
+ } catch(InterruptedException ie) {
+ throw new IOException(ie);
+ }
}
/**
@@ -4373,13 +4382,15 @@
FileSystem historyFS = null;
jobHistory = new JobHistory();
- jobHistory.init(this, conf, this.localMachine, this.startTime);
- jobHistory.initDone(conf, fs);
- final String historyLogDir = jobHistory.getCompletedJobHistoryLocation().toString();
- infoServer.setAttribute("historyLogDir", historyLogDir);
+ final JobTracker jtFinal = this;
try {
historyFS = mrOwner.doAs(new PrivilegedExceptionAction<FileSystem>() {
public FileSystem run() throws IOException {
+ jobHistory.init(jtFinal, conf, jtFinal.localMachine, jtFinal.startTime);
+ jobHistory.initDone(conf, fs);
+ final String historyLogDir =
+ jobHistory.getCompletedJobHistoryLocation().toString();
+ infoServer.setAttribute("historyLogDir", historyLogDir);
return new Path(historyLogDir).getFileSystem(conf);
}
});
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=907393&r1=907392&r2=907393&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Sun Feb 7 08:01:06 2010
@@ -108,13 +108,13 @@
long jobTrackerStartTime) throws IOException {
// Get and create the log folder
- String logDirLoc = conf.get(JTConfig.JT_JOBHISTORY_LOCATION ,
+ final String logDirLoc = conf.get(JTConfig.JT_JOBHISTORY_LOCATION ,
"file:///" +
new File(System.getProperty("hadoop.log.dir")).getAbsolutePath()
+ File.separator + "history");
LOG.info("History log directory is " + logDirLoc);
-
+
logDir = new Path(logDirLoc);
logDirFs = logDir.getFileSystem(conf);