You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/05/24 23:27:23 UTC
svn commit: r1127298 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/
...
Author: mahadev
Date: Tue May 24 21:27:23 2011
New Revision: 1127298
URL: http://svn.apache.org/viewvc?rev=1127298&view=rev
Log:
MAPREDUCE-2522. Security for JobHistory service. (Siddharth Seth via mahadev)
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 24 21:27:23 2011
@@ -3,6 +3,9 @@ Hadoop MapReduce Change Log
Trunk (unreleased changes)
MAPREDUCE-279
+
+ MAPREDUCE-2522. Security for JobHistory service. (Siddharth Seth via
+ mahadev)
Added metrics for tracking reservations in CapacityScheduler. (Luke Lu via
acmurthy)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue May 24 21:27:23 2011
@@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -40,8 +41,10 @@ import org.apache.hadoop.mapreduce.TypeC
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -54,12 +57,14 @@ import org.apache.hadoop.yarn.service.Ab
* JobHistory implementation is in this package to access package private
* classes.
*/
+
public class JobHistoryEventHandler extends AbstractService
implements EventHandler<JobHistoryEvent> {
private final AppContext context;
private final int startCount;
+ //TODO Does the FS object need to be different ?
private FileSystem logDirFS; // log Dir FileSystem
private FileSystem doneDirFS; // done Dir FileSystem
@@ -68,6 +73,7 @@ public class JobHistoryEventHandler exte
private Path logDirPath = null;
private Path doneDirPrefixPath = null; // folder for completed jobs
+
private BlockingQueue<JobHistoryEvent> eventQueue =
new LinkedBlockingQueue<JobHistoryEvent>();
private Thread eventHandlingThread;
@@ -97,42 +103,84 @@ public class JobHistoryEventHandler exte
this.conf = conf;
String logDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(conf);
+ String userLogDir = JobHistoryUtils.getHistoryLogDirForUser(conf);
String doneDirPrefix = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+ String userDoneDirPrefix = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
+ //Check for the existance of the log dir. Maybe create it.
+ Path path = null;
try {
- doneDirPrefixPath = FileSystem.get(conf).makeQualified(
- new Path(doneDirPrefix));
- doneDirFS = FileSystem.get(doneDirPrefixPath.toUri(), conf);
- if (!doneDirFS.exists(doneDirPrefixPath)) {
+ path = FileSystem.get(conf).makeQualified(new Path(logDir));
+ logDirFS = FileSystem.get(path.toUri(), conf);
+ LOG.info("Maybe creating staging history logDir: [" + path + "]");
+ mkdir(logDirFS, path, new FsPermission(JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
+ } catch (IOException e) {
+ LOG.error("Failed while checking for/ceating history staging path: [" + path + "]", e);
+ throw new YarnException(e);
+ }
+
+ //Check for the existance of intermediate done dir.
+ Path doneDirPath = null;
try {
- doneDirFS.mkdirs(doneDirPrefixPath, new FsPermission(
- JobHistoryUtils.HISTORY_DIR_PERMISSION));
- } catch (FileAlreadyExistsException e) {
- LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
- + "] already exists.");
+ doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirPrefix));
+ doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
+ if (!doneDirFS.exists(doneDirPath)) {
+ // This directory will be in a common location, or this may be a cluster meant for a single user.
+ // Creating based on the conf.
+ // Should ideally be created by the JobHistoryServer or as part of deployment.
+ if (JobHistoryUtils.shouldCreateNonUserDirectory(conf)) {
+ LOG.info("Creating intermediate history logDir: [" + doneDirPath + "] + based on conf. Should ideally be created by the JobHistoryServer: " + JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY);
+ mkdir(doneDirFS, doneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
+ //TODO Temporary toShort till new FsPermission(FsPermissions) respects sticky
+ } else {
+ LOG.error("Not creating intermediate history logDir: [" + doneDirPath + "] based on conf: " + JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY + ". Either set to true or pre-create this directory with appropriate permissions");
+ throw new YarnException("Not creating intermediate history logDir: [" + doneDirPath + "] based on conf: " + JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY + ". Either set to true or pre-create this directory with appropriate permissions");
}
}
} catch (IOException e) {
- LOG.info("error creating done directory on dfs " + e);
+ LOG.error("Failed checking for the existance of history intermediate done directory: [" + doneDirPath + "]");
throw new YarnException(e);
}
+
+ //Check/create staging directory.
try {
- logDirPath = FileSystem.get(conf).makeQualified(
- new Path(logDir));
- logDirFS = FileSystem.get(logDirPath.toUri(), conf);
- if (!logDirFS.exists(logDirPath)) {
+ logDirPath = FileSystem.get(conf).makeQualified(new Path(userLogDir));
+ mkdir(logDirFS, logDirPath, new FsPermission(JobHistoryUtils.HISTORY_STAGING_USER_DIR_PERMISSIONS));
+ } catch (IOException e) {
+ LOG.error("Error creating user staging history directory: [" + logDirPath + "]", e);
+ throw new YarnException(e);
+ }
+
+ //Check/create user directory under intermediate done dir.
try {
- logDirFS.mkdirs(logDirPath, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION));
- } catch (FileAlreadyExistsException e) {
- LOG.info("JobHistory Log Directory: [" + doneDirPrefixPath
- + "] already exists.");
+ doneDirPrefixPath = FileSystem.get(conf).makeQualified(
+ new Path(userDoneDirPrefix));
+ mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
+ } catch (IOException e) {
+ LOG.error("Error creating user intermediate history done directory: [ " + doneDirPrefixPath + "]", e);
+ throw new YarnException(e);
+ }
+
+ super.init(conf);
+ }
+
+ private void mkdir(FileSystem fs, Path path, FsPermission fsp)
+ throws IOException {
+ if (!fs.exists(path)) {
+ try {
+ fs.mkdirs(path, fsp);
+ FileStatus fsStatus = fs.getFileStatus(path);
+ LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+ + ", Expected: " + fsp.toShort());
+ if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+ LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+ + ", " + fsp);
+ fs.setPermission(path, fsp);
}
+ } catch (FileAlreadyExistsException e) {
+ LOG.info("Directory: [" + path + "] already exists.");
}
- } catch (IOException ioe) {
- LOG.info("Mkdirs failed to create " + logDirPath.toString());
- throw new YarnException(ioe);
}
- super.init(conf);
}
@Override
@@ -242,7 +290,6 @@ public class JobHistoryEventHandler exte
}
}
- //This could be done at the end as well in moveToDone
Path logDirConfPath = null;
if (conf != null) {
logDirConfPath = JobHistoryUtils.getStagingConfFile(logDirPath, jobId, startCount);
@@ -295,7 +342,7 @@ public class JobHistoryEventHandler exte
try {
setupEventWriter(event.getJobID());
} catch (IOException ioe) {
- LOG.error("Error JobHistoryEventHandler in handle " + ioe);
+ LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, ioe);
throw new YarnException(ioe);
}
}
@@ -305,7 +352,7 @@ public class JobHistoryEventHandler exte
mi.writeEvent(historyEvent);
LOG.info("In HistoryEventHandler " + event.getHistoryEvent().getEventType());
} catch (IOException e) {
- LOG.error("Error writing History Event " + e);
+ LOG.error("Error writing History Event: " + event.getHistoryEvent(), e);
throw new YarnException(e);
}
// check for done
@@ -323,6 +370,7 @@ public class JobHistoryEventHandler exte
}
}
+ //TODO Path is intermediate_done/user -> Work with this throughout.
protected void closeEventWriter(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
@@ -343,38 +391,32 @@ public class JobHistoryEventHandler exte
LOG.warn("No file for jobconf with " + jobId + " found in cache!");
}
- String doneDir = JobHistoryUtils.getCurrentDoneDir(doneDirPrefixPath
- .toString());
- Path doneDirPath = doneDirFS.makeQualified(new Path(doneDir));
+ Path qualifiedDoneFile = null;
try {
- if (!pathExists(doneDirFS, doneDirPath)) {
- doneDirFS.mkdirs(doneDirPath, new FsPermission(
- JobHistoryUtils.HISTORY_DIR_PERMISSION));
- }
-
if (mi.getHistoryFile() != null) {
Path logFile = mi.getHistoryFile();
Path qualifiedLogFile = logDirFS.makeQualified(logFile);
- String doneJobHistoryFileName = FileNameIndexUtils.getDoneFileName(mi
- .getJobIndexInfo());
- Path qualifiedDoneFile = doneDirFS.makeQualified(new Path(doneDirPath,
- doneJobHistoryFileName));
+ String doneJobHistoryFileName = getTempFileName(FileNameIndexUtils.getDoneFileName(mi
+ .getJobIndexInfo()));
+ qualifiedDoneFile = doneDirFS.makeQualified(new Path(
+ doneDirPrefixPath, doneJobHistoryFileName));
moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
}
+ Path qualifiedConfDoneFile = null;
if (mi.getConfFile() != null) {
Path confFile = mi.getConfFile();
Path qualifiedConfFile = logDirFS.makeQualified(confFile);
- String doneConfFileName = JobHistoryUtils
- .getIntermediateConfFileName(jobId);
- Path qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(
- doneDirPath, doneConfFileName));
+ String doneConfFileName = getTempFileName(JobHistoryUtils
+ .getIntermediateConfFileName(jobId));
+ qualifiedConfDoneFile = doneDirFS.makeQualified(new Path(
+ doneDirPrefixPath, doneConfFileName));
moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
}
- String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobId);
- Path doneFilePath = doneDirFS.makeQualified(new Path(doneDirPath,
- doneFileName));
- touchFile(doneFilePath);
+
+
+ moveTmpToDone(qualifiedConfDoneFile);
+ moveTmpToDone(qualifiedDoneFile);
} catch (IOException e) {
LOG.error("Error closing writer for JobID: " + jobId);
throw e;
@@ -420,42 +462,58 @@ public class JobHistoryEventHandler exte
}
}
+ private void moveTmpToDone(Path tmpPath) throws IOException {
+ if (tmpPath != null) {
+ String tmpFileName = tmpPath.getName();
+ String fileName = getFileNameFromTmpFN(tmpFileName);
+ Path path = new Path(tmpPath.getParent(), fileName);
+ doneDirFS.rename(tmpPath, path);
+ LOG.info("Moved tmp to done: " + tmpPath + " to " + path);
+ }
+ }
+
+ // TODO If the FS objects are the same, this should be a rename instead of a
+ // copy.
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
- //check if path exists, in case of retries it may not exist
+ // check if path exists, in case of retries it may not exist
if (logDirFS.exists(fromPath)) {
- LOG.info("Moving " + fromPath.toString() + " to " +
- toPath.toString());
- //TODO temporarily removing the existing dst
+ LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString());
+ // TODO temporarily removing the existing dst
if (doneDirFS.exists(toPath)) {
doneDirFS.delete(toPath, true);
}
- boolean copied =
- FileUtil.copy(logDirFS, fromPath, doneDirFS, toPath, false, conf);
+ boolean copied = FileUtil.copy(logDirFS, fromPath, doneDirFS, toPath,
+ false, conf);
+
if (copied)
- LOG.info("Copied to done location: "+ toPath);
+ LOG.info("Copied to done location: " + toPath);
else
LOG.info("copy failed");
- doneDirFS.setPermission(toPath,
- new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
+ doneDirFS.setPermission(toPath, new FsPermission(
+ JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
logDirFS.delete(fromPath, false);
}
}
- private void touchFile(Path path) throws IOException {
- doneDirFS.createNewFile(path);
- doneDirFS.setPermission(path, JobHistoryUtils.HISTORY_DIR_PERMISSION);
- }
-
boolean pathExists(FileSystem fileSys, Path path) throws IOException {
return fileSys.exists(path);
}
+ private String getTempFileName(String srcFile) {
+ return srcFile + "_tmp";
+ }
+
+ private String getFileNameFromTmpFN(String tmpFileName) {
+ //TODO. Some error checking here.
+ return tmpFileName.substring(0, tmpFileName.length()-4);
+ }
+
private void writeStatus(String statusstoredir, HistoryEvent event) throws IOException {
try {
Path statusstorepath = doneDirFS.makeQualified(new Path(statusstoredir));
doneDirFS.mkdirs(statusstorepath,
- new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION));
+ new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
Path toPath = new Path(statusstoredir, "jobstats");
FSDataOutputStream out = doneDirFS.create(toPath, true);
EventWriter writer = new EventWriter(out);
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue May 24 21:27:23 2011
@@ -151,13 +151,7 @@ public class RecoveryService extends Com
private void parse() throws IOException {
// TODO: parse history file based on startCount
String jobName = TypeConverter.fromYarn(appID).toString();
-// String defaultStagingDir = getConfig().get(
-// YARNApplicationConstants.APPS_STAGING_DIR_KEY)
-// + "/history/staging";
-
-// String jobhistoryDir = getConfig().get(
-// YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir);
- String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryLogDirPrefix(getConfig());
+ String jobhistoryDir = JobHistoryUtils.getHistoryLogDirForUser(getConfig());
FSDataInputStream in = null;
Path historyFile = null;
Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue May 24 21:27:23 2011
@@ -59,6 +59,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner;
import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.YarnException;
@@ -129,6 +130,7 @@ public class MRApp extends MRAppMaster {
String user = conf.get(MRJobConfig.USER_NAME, "mapred");
conf.set(MRJobConfig.USER_NAME, user);
conf.set(YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, testAbsPath.toString());
+ conf.setBoolean(JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY, true);
init(conf);
start();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Tue May 24 21:27:23 2011
@@ -35,78 +35,6 @@ public class YarnMRJobConfig {
= "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.lambda";
public static final String EXPONENTIAL_SMOOTHING_SMOOTH_RATE
= "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.smoothsrate";
- public static final String HS_PREFIX = "yarn.server.historyserver.";
-
- public static final String DEFAULT_HS_BIND_ADDRESS = "0.0.0.0:10020";
-
- /** host:port address to which to bind to **/
- public static final String HS_BIND_ADDRESS = HS_PREFIX + "address";
-
- /** Staging Dir for AppMaster **/
- public static final String HISTORY_STAGING_DIR_KEY =
- "yarn.historyfile.stagingDir";
-
- /** Done Dir for for AppMaster **/
- public static final String HISTORY_INTERMEDIATE_DONE_DIR_KEY =
- "yarn.historyfile.intermediateDoneDir";
-
- /** Done Dir for for AppMaster **/
- public static final String HISTORY_DONE_DIR_KEY =
- "yarn.historyfile.doneDir";
-
- /** Done Dir for history server. **/
- public static final String HISTORY_SERVER_DONE_DIR_KEY =
- HS_PREFIX + ".historyfile.doneDir";
-
- /**
- * Size of the job list cache.
- */
- public static final String HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY =
- HS_PREFIX + ".joblist.cache.size";
-
- /**
- * Size of the loaded job cache.
- */
- public static final String HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY =
- HS_PREFIX + ".loadedjobs.cache.size";
-
- /**
- * Size of the date string cache. Effects the number of directories
- * which will be scanned to find a job.
- */
- public static final String HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY =
- HS_PREFIX + ".datestring.cache.size";
-
- /**
- * The time interval in milliseconds for the history server
- * to wake up and scan for files to be moved.
- */
- public static final String HISTORY_SERVER_MOVE_THREAD_INTERVAL =
- HS_PREFIX + ".move.thread.interval";
-
- /**
- * The number of threads used to move files.
- */
- public static final String HISTORY_SERVER_NUM_MOVE_THREADS =
- HS_PREFIX + ".move.threads.count";
-
- // Equivalent to 0.20 mapreduce.jobhistory.debug.mode
- public static final String HISTORY_DEBUG_MODE_KEY = HS_PREFIX + ".debug.mode";
-
- public static final String HISTORY_MAXAGE =
- "yarn.historyfile.maxage";
-
- /**
- * Run interval for the History Cleaner thread.
- */
- public static final String HISTORY_CLEANER_RUN_INTERVAL =
- HS_PREFIX + ".cleaner.run.interval";
-
- public static final String HS_WEBAPP_BIND_ADDRESS = HS_PREFIX +
- "address.webapp";
- public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
- "0.0.0.0:19888";
-
public static final String RECOVERY_ENABLE
= "yarn.mapreduce.job.recovery.enable";
}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java?rev=1127298&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHConfig.java Tue May 24 21:27:23 2011
@@ -0,0 +1,93 @@
+package org.apache.hadoop.mapreduce.v2.jobhistory;
+
+public class JHConfig {
+ public static final String HS_PREFIX = "yarn.server.historyserver.";
+ /** host:port address to which to bind to **/
+ public static final String HS_BIND_ADDRESS = HS_PREFIX + "address";
+
+ public static final String HS_USER_NAME = HS_PREFIX + "kerberos.principal";
+
+ public static final String HS_KEYTAB_FILE = HS_PREFIX + "jeytab.file";
+
+ public static final String DEFAULT_HS_BIND_ADDRESS = "0.0.0.0:10020";
+
+ /** Staging Dir for AppMaster **/
+ public static final String HISTORY_STAGING_DIR_KEY =
+ "yarn.historyfile.stagingDir";
+
+ /** Done Dir for for AppMaster **/
+ public static final String HISTORY_INTERMEDIATE_DONE_DIR_KEY =
+ "yarn.historyfile.intermediateDoneDir";
+
+ /** Done Dir for for AppMaster **/
+ public static final String HISTORY_DONE_DIR_KEY =
+ "yarn.historyfile.doneDir";
+
+ /**
+ * Boolean. Create the base dirs in the JobHistoryEventHandler
+ * Set to false for multi-user clusters.
+ */
+ public static final String CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY =
+ "yarn.history.create.intermediate.base.dir";
+
+ /** Done Dir for history server. **/
+ public static final String HISTORY_SERVER_DONE_DIR_KEY =
+ HS_PREFIX + "historyfile.doneDir";
+
+ /**
+ * Size of the job list cache.
+ */
+ public static final String HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY =
+ HS_PREFIX + "joblist.cache.size";
+
+ /**
+ * Size of the loaded job cache.
+ */
+ public static final String HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY =
+ HS_PREFIX + "loadedjobs.cache.size";
+
+ /**
+ * Size of the date string cache. Effects the number of directories
+ * which will be scanned to find a job.
+ */
+ public static final String HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY =
+ HS_PREFIX + "datestring.cache.size";
+
+ /**
+ * The time interval in milliseconds for the history server
+ * to wake up and scan for files to be moved.
+ */
+ public static final String HISTORY_SERVER_MOVE_THREAD_INTERVAL =
+ HS_PREFIX + "move.thread.interval";
+
+ /**
+ * The number of threads used to move files.
+ */
+ public static final String HISTORY_SERVER_NUM_MOVE_THREADS =
+ HS_PREFIX + "move.threads.count";
+
+ // Equivalent to 0.20 mapreduce.jobhistory.debug.mode
+ public static final String HISTORY_DEBUG_MODE_KEY = HS_PREFIX + "debug.mode";
+
+ public static final String HISTORY_MAXAGE =
+ "yarn.historyfile.maxage";
+
+ //TODO Move some of the HistoryServer specific out into a separate configuration class.
+ public static final String HS_KEYTAB_KEY = HS_PREFIX + "keytab";
+
+ public static final String HS_SERVER_PRINCIPAL_KEY = "yarn.historyserver.principal";
+
+ public static final String RUN_HISTORY_CLEANER_KEY =
+ HS_PREFIX + "cleaner.run";
+
+ /**
+ * Run interval for the History Cleaner thread.
+ */
+ public static final String HISTORY_CLEANER_RUN_INTERVAL =
+ HS_PREFIX + "cleaner.run.interval";
+
+ public static final String HS_WEBAPP_BIND_ADDRESS = HS_PREFIX +
+ "address.webapp";
+ public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
+ "0.0.0.0:19888";
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Tue May 24 21:27:23 2011
@@ -36,30 +36,56 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
public class JobHistoryUtils {
- private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
+ /**
+ * Permissions for the history staging dir while JobInProgress.
+ */
+ public static final FsPermission HISTORY_STAGING_DIR_PERMISSIONS =
+
+ FsPermission.createImmutable( (short) 0700);
+
+ /**
+ * Permissions for the user directory under the staging directory.
+ */
+ public static final FsPermission HISTORY_STAGING_USER_DIR_PERMISSIONS =
+ FsPermission.createImmutable((short) 0700);
- public static final FsPermission HISTORY_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0750); // rwxr-x---
- public static final FsPermission HISTORY_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0740); // rwxr-----
/**
- * Suffix for configuration files.
+ * Permissions for the history done dir and derivatives.
*/
- public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
+ public static final FsPermission HISTORY_DONE_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0770);
+
+ public static final FsPermission HISTORY_DONE_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0770); // rwx------
/**
- * Suffix for done files.
+ * Permissions for the intermediate done directory.
*/
- public static final String DONE_FILE_NAME_SUFFIX = ".done";
+ public static final FsPermission HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS =
+ FsPermission.createImmutable((short) 01777);
+
+ /**
+ * Permissions for the user directory under the intermediate done directory.
+ */
+ public static final FsPermission HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS =
+ FsPermission.createImmutable((short) 0770);
+
+ public static final FsPermission HISTORY_INTERMEDIATE_FILE_PERMISSIONS =
+ FsPermission.createImmutable((short) 0770); // rwx------
+
+ /**
+ * Suffix for configuration files.
+ */
+ public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
/**
* Job History File extension.
@@ -110,12 +136,13 @@ public class JobHistoryUtils {
return JOB_HISTORY_FILE_FILTER;
}
+ //The version string may need to be removed.
/**
* Returns the current done directory.
* @param doneDirPrefix the prefix for the done directory.
* @return A string representation of the done directory.
*/
- public static String getCurrentDoneDir(String doneDirPrefix) {
+ private static String getCurrentDoneDir(String doneDirPrefix) {
return doneDirPrefix + File.separator + LOG_VERSION_STRING + File.separator;
}
@@ -126,8 +153,9 @@ public class JobHistoryUtils {
*/
public static String getConfiguredHistoryLogDirPrefix(Configuration conf) {
String defaultLogDir = conf.get(
- YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/staging";
- String logDir = conf.get(YarnMRJobConfig.HISTORY_STAGING_DIR_KEY,
+ //TODO Change this to staging directory
+ YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, "/tmp") + "/history/staging";
+ String logDir = conf.get(JHConfig.HISTORY_STAGING_DIR_KEY,
defaultLogDir);
return logDir;
}
@@ -139,9 +167,9 @@ public class JobHistoryUtils {
*/
public static String getConfiguredHistoryIntermediateDoneDirPrefix(Configuration conf) {
String defaultDoneDir = conf.get(
- YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/done_intermediate";
+ YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, "/tmp") + "/history/done_intermediate";
String doneDirPrefix =
- conf.get(YarnMRJobConfig.HISTORY_INTERMEDIATE_DONE_DIR_KEY,
+ conf.get(JHConfig.HISTORY_INTERMEDIATE_DONE_DIR_KEY,
defaultDoneDir);
return doneDirPrefix;
}
@@ -153,11 +181,37 @@ public class JobHistoryUtils {
*/
public static String getConfiguredHistoryServerDoneDirPrefix(Configuration conf) {
String defaultDoneDir = conf.get(
- YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY) + "/history/done";
+ YARNApplicationConstants.APPS_HISTORY_STAGING_DIR_KEY, "/tmp") + "/history/done";
String doneDirPrefix =
- conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY,
+ conf.get(JHConfig.HISTORY_DONE_DIR_KEY,
defaultDoneDir);
- return doneDirPrefix;
+ return getCurrentDoneDir(doneDirPrefix);
+ }
+
+ /**
+ * Gets the user directory for In progress history files.
+ * @param conf
+ * @return
+ */
+ public static String getHistoryLogDirForUser(Configuration conf) {
+ return getConfiguredHistoryLogDirPrefix(conf) + File.separator
+ + conf.get(MRJobConfig.USER_NAME);
+ }
+
+ /**
+ * Gets the user directory for intermediate done history files.
+ * @param conf
+ * @return
+ */
+ public static String getHistoryIntermediateDoneDirForUser(Configuration conf) {
+ return getConfiguredHistoryIntermediateDoneDirPrefix(conf) + File.separator
+ + conf.get(MRJobConfig.USER_NAME);
+ }
+
+ public static boolean shouldCreateNonUserDirectory(Configuration conf) {
+ // Returning true by default to allow non secure single node clusters to work
+ // without any configuration change.
+ return conf.getBoolean(JHConfig.CREATE_HISTORY_INTERMEDIATE_BASE_DIR_KEY, true);
}
/**
@@ -184,10 +238,6 @@ public class JobHistoryUtils {
return TypeConverter.fromYarn(jobId).toString() + CONF_FILE_NAME_SUFFIX;
}
- public static String getIntermediateDoneFileName(JobId jobId) {
- return TypeConverter.fromYarn(jobId).toString() + DONE_FILE_NAME_SUFFIX;
- }
-
/**
* Gets the conf file path for jobs in progress.
*
@@ -240,11 +290,12 @@ public class JobHistoryUtils {
* @return
*/
public static String historyLogSubdirectory(JobId id, String timestampComponent, String serialNumberFormat) {
- String result = LOG_VERSION_STRING;
+// String result = LOG_VERSION_STRING;
+ String result = "";
String serialNumberDirectory = serialNumberDirectoryComponent(id, serialNumberFormat);
result = result
- + File.separator + timestampComponent
+ + timestampComponent
+ File.separator + serialNumberDirectory
+ File.separator;
@@ -273,11 +324,8 @@ public class JobHistoryUtils {
}
public static String doneSubdirsBeforeSerialTail() {
- // Version Info
- String result = ("/" + LOG_VERSION_STRING);
-
// date
- result = result + "/*/*/*"; // YYYY/MM/DD ;
+ String result = "/*/*/*"; // YYYY/MM/DD ;
return result;
}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java?rev=1127298&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/security/client/ClientHSSecurityInfo.java Tue May 24 21:27:23 2011
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.security.client;
+
+import java.lang.annotation.Annotation;
+
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+
+public class ClientHSSecurityInfo implements SecurityInfo {
+
+ @Override
+ public KerberosInfo getKerborosInfo(Class<?> protocol) {
+ return new KerberosInfo() {
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public String serverPrincipal() {
+ return JHConfig.HS_SERVER_PRINCIPAL_KEY;
+ }
+
+ @Override
+ public String clientPrincipal() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public TokenInfo getTokenInfo(Class<?> protocol) {
+ return null;
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobACLsManager.java Tue May 24 21:27:23 2011
@@ -100,6 +100,7 @@ public class JobACLsManager {
return true;
}
+ //TODO Shouldn't this be doing some kind of a check to verify JobACL jobOperation?
return false;
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Tue May 24 21:27:23 2011
@@ -27,8 +27,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.JobACL;
@@ -45,7 +43,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.YarnException;
@@ -138,16 +135,6 @@ public class CompletedJob implements org
if (jobInfo != null) {
return; //data already loaded
}
- String user = conf.get(MRJobConfig.USER_NAME);
- if (user == null) {
- LOG.error("user null is not allowed");
- }
- String jobName = TypeConverter.fromYarn(jobId).toString();
-
- String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
-
-
- String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
if (historyFileAbsolute != null) {
try {
@@ -158,26 +145,6 @@ public class CompletedJob implements org
e);
}
}
- else {
- FSDataInputStream in = null;
- Path historyFile = null;
- try {
- Path doneDirPath = FileContext.getFileContext(conf).makeQualified(
- new Path(currentJobHistoryDir));
- FileContext fc =
- FileContext.getFileContext(doneDirPath.toUri(),conf);
- //TODO_JH_There could be multiple instances
- //TODO_JH_FileName
- historyFile =
- fc.makeQualified(new Path(doneDirPath, jobName + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION));
- in = fc.open(historyFile);
- JobHistoryParser parser = new JobHistoryParser(in);
- jobInfo = parser.parse();
- } catch (IOException e) {
- throw new YarnException("Could not load history file " + historyFile,
- e);
- }
- }
if (loadTasks) {
// populate the tasks
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryClientService.java Tue May 24 21:27:23 2011
@@ -18,9 +18,12 @@
package org.apache.hadoop.mapreduce.v2.hs;
+import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.security.AccessControlException;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
@@ -28,6 +31,8 @@ import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@@ -58,9 +63,12 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.hs.webapp.HSWebApp;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.mapreduce.v2.security.client.ClientHSSecurityInfo;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -69,7 +77,6 @@ import org.apache.hadoop.yarn.ipc.YarnRP
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.webapp.WebApp;
import org.apache.hadoop.yarn.webapp.WebApps;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
/**
* This module is responsible for talking to the
@@ -93,15 +100,18 @@ public class HistoryClientService extend
}
public void start() {
+ YarnRPC rpc = YarnRPC.create(getConfig());
Configuration conf = new Configuration(getConfig());
- YarnRPC rpc = YarnRPC.create(conf);
- initializeWebApp(conf);
- String serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
- YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
+ conf.setClass(
+ CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ ClientHSSecurityInfo.class, SecurityInfo.class);
+ initializeWebApp(getConfig());
+ String serviceAddr = conf.get(JHConfig.HS_BIND_ADDRESS,
+ JHConfig.DEFAULT_HS_BIND_ADDRESS);
InetSocketAddress address = NetUtils.createSocketAddr(serviceAddr);
InetAddress hostNameResolved = null;
try {
- hostNameResolved = address.getAddress().getLocalHost();
+ hostNameResolved = InetAddress.getLocalHost(); //address.getAddress().getLocalHost();
} catch (UnknownHostException e) {
throw new YarnException(e);
}
@@ -122,8 +132,8 @@ public class HistoryClientService extend
private void initializeWebApp(Configuration conf) {
webApp = new HSWebApp(history);
- String bindAddress = conf.get(YarnMRJobConfig.HS_WEBAPP_BIND_ADDRESS,
- YarnMRJobConfig.DEFAULT_HS_WEBAPP_BIND_ADDRESS);
+ String bindAddress = conf.get(JHConfig.HS_WEBAPP_BIND_ADDRESS,
+ JHConfig.DEFAULT_HS_WEBAPP_BIND_ADDRESS);
WebApps.$for("yarn", this).at(bindAddress).start(webApp);
}
@@ -142,18 +152,37 @@ public class HistoryClientService extend
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- private Job getJob(JobId jobId) throws YarnRemoteException {
- Job job = history.getJob(jobId);
+ private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException {
+ UserGroupInformation loginUgi = null;
+ Job job = null;
+ try {
+ loginUgi = UserGroupInformation.getLoginUser();
+ job = loginUgi.doAs(new PrivilegedExceptionAction<Job>() {
+
+ @Override
+ public Job run() throws Exception {
+ Job job = history.getJob(jobID);
+ return job;
+ }
+ });
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
+ } catch (InterruptedException e) {
+ throw RPCUtil.getRemoteException(e);
+ }
if (job == null) {
- throw RPCUtil.getRemoteException("Unknown job " + jobId);
+ throw RPCUtil.getRemoteException("Unknown job " + jobID);
}
+ JobACL operation = JobACL.VIEW_JOB;
+ //TODO disable check access for now.
+ checkAccess(job, operation);
return job;
}
@Override
public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
JobId jobId = request.getJobId();
- Job job = getJob(jobId);
+ Job job = verifyAndGetJob(jobId);
GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(job.getCounters());
return response;
@@ -162,7 +191,7 @@ public class HistoryClientService extend
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException {
JobId jobId = request.getJobId();
- Job job = getJob(jobId);
+ Job job = verifyAndGetJob(jobId);
GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
response.setJobReport(job.getReport());
return response;
@@ -171,7 +200,7 @@ public class HistoryClientService extend
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
- Job job = getJob(taskAttemptId.getTaskId().getJobId());
+ Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
response.setTaskAttemptReport(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getReport());
return response;
@@ -180,7 +209,7 @@ public class HistoryClientService extend
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException {
TaskId taskId = request.getTaskId();
- Job job = getJob(taskId.getJobId());
+ Job job = verifyAndGetJob(taskId.getJobId());
GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
response.setTaskReport(job.getTask(taskId).getReport());
return response;
@@ -192,7 +221,7 @@ public class HistoryClientService extend
int fromEventId = request.getFromEventId();
int maxEvents = request.getMaxEvents();
- Job job = getJob(jobId);
+ Job job = verifyAndGetJob(jobId);
GetTaskAttemptCompletionEventsResponse response = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
response.addAllCompletionEvents(Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
return response;
@@ -200,21 +229,16 @@ public class HistoryClientService extend
@Override
public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException {
- JobId jobId = request.getJobId();
throw RPCUtil.getRemoteException("Invalid operation on completed job");
}
@Override
public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException {
- TaskId taskId = request.getTaskId();
- getJob(taskId.getJobId());
throw RPCUtil.getRemoteException("Invalid operation on completed job");
}
@Override
public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException {
- TaskAttemptId taskAttemptId = request.getTaskAttemptId();
- getJob(taskAttemptId.getTaskId().getJobId());
throw RPCUtil.getRemoteException("Invalid operation on completed job");
}
@@ -222,7 +246,7 @@ public class HistoryClientService extend
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
- Job job = getJob(taskAttemptId.getTaskId().getJobId());
+ Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
GetDiagnosticsResponse response = recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
response.addAllDiagnostics(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getDiagnostics());
@@ -231,8 +255,6 @@ public class HistoryClientService extend
@Override
public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException {
- TaskAttemptId taskAttemptId = request.getTaskAttemptId();
- getJob(taskAttemptId.getTaskId().getJobId());
throw RPCUtil.getRemoteException("Invalid operation on completed job");
}
@@ -242,7 +264,7 @@ public class HistoryClientService extend
TaskType taskType = request.getTaskType();
GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class);
- Job job = getJob(jobId);
+ Job job = verifyAndGetJob(jobId);
Collection<Task> tasks = job.getTasks(taskType).values();
for (Task task : tasks) {
response.addTaskReport(task.getReport());
@@ -250,6 +272,22 @@ public class HistoryClientService extend
return response;
}
+ private void checkAccess(Job job, JobACL jobOperation)
+ throws YarnRemoteException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ UserGroupInformation callerUGI;
+ try {
+ callerUGI = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
+ }
+ if (!job.checkAccess(callerUGI, jobOperation)) {
+ throw RPCUtil.getRemoteException(new AccessControlException("User "
+ + callerUGI.getShortUserName() + " cannot perform operation "
+ + jobOperation.name() + " on " + job.getID()));
+ }
+ }
}
-
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue May 24 21:27:23 2011
@@ -45,16 +45,17 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.yarn.Clock;
@@ -127,6 +128,11 @@ public class JobHistory extends Abstract
private final SortedMap<JobId, Job> loadedJobCache = new ConcurrentSkipListMap<JobId, Job>(
JOB_ID_COMPARATOR);
+ /**
+ * Maintains a mapping between intermediate user directories and the last known modification time.
+ */
+ private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>();
+
//The number of jobs to maintain in the job list cache.
private int jobListCacheSize;
@@ -153,17 +159,12 @@ public class JobHistory extends Abstract
private FileContext doneDirFc; // done Dir FileContext
private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path
- private FileContext intermediaDoneDirFc; //Intermediate Done Dir FileContext
+ private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext
private Thread moveIntermediateToDoneThread = null;
private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
- /*
- * TODO
- * Fix completion time in JobFinishedEvent
- */
-
/**
* Writes out files to the path
* .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
@@ -173,7 +174,7 @@ public class JobHistory extends Abstract
public void init(Configuration conf) throws YarnException {
LOG.info("JobHistory Init");
this.conf = conf;
- debugMode = conf.getBoolean(YarnMRJobConfig.HISTORY_DEBUG_MODE_KEY, false);
+ debugMode = conf.getBoolean(JHConfig.HISTORY_DEBUG_MODE_KEY, false);
serialNumberLowDigits = debugMode ? 1 : 3;
serialNumberFormat = ("%0"
+ (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS + serialNumberLowDigits) + "d");
@@ -185,68 +186,31 @@ public class JobHistory extends Abstract
doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
new Path(doneDirPrefix));
doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
- if (!doneDirFc.util().exists(doneDirPrefixPath)) {
- try {
- doneDirFc.mkdir(doneDirPrefixPath, new FsPermission(
- JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
- } catch (FileAlreadyExistsException e) {
- LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
- + "] already exists.");
- }
- }
- } catch (IOException e) {
- throw new YarnException("error creating done directory on dfs ", e);
- }
-
- String doneDirWithVersion = JobHistoryUtils
- .getCurrentDoneDir(doneDirPrefix);
- try {
- Path doneDirWithVersionPath = FileContext.getFileContext(conf)
- .makeQualified(new Path(doneDirWithVersion));
- if (!doneDirFc.util().exists(doneDirWithVersionPath)) {
- try {
- doneDirFc.mkdir(doneDirWithVersionPath, new FsPermission(
- JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
- } catch (FileAlreadyExistsException e) {
- LOG.info("JobHistory Done Directory: [" + doneDirPrefixPath
- + "] already exists.");
- }
- }
+ mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
} catch (IOException e) {
- throw new YarnException("error creating done_version directory on dfs", e);
+ throw new YarnException("Error creating done directory: [" + doneDirPrefixPath + "]", e);
}
String intermediateDoneDirPrefix = JobHistoryUtils
.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
- String intermediateDoneDir = JobHistoryUtils
- .getCurrentDoneDir(intermediateDoneDirPrefix);
try {
intermediateDoneDirPath = FileContext.getFileContext(conf)
- .makeQualified(new Path(intermediateDoneDir));
- intermediaDoneDirFc = FileContext.getFileContext(
+ .makeQualified(new Path(intermediateDoneDirPrefix));
+ intermediateDoneDirFc = FileContext.getFileContext(
intermediateDoneDirPath.toUri(), conf);
- if (!intermediaDoneDirFc.util().exists(intermediateDoneDirPath)) {
- try {
- intermediaDoneDirFc.mkdir(intermediateDoneDirPath,
- new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
- } catch (FileAlreadyExistsException e) {
- LOG.info("Intermediate JobHistory Done Directory: ["
- + intermediateDoneDirPath + "] already exists.");
- }
- }
-
+ mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
} catch (IOException e) {
LOG.info("error creating done directory on dfs " + e);
- throw new YarnException("error creating done directory on dfs ", e);
+ throw new YarnException("Error creating intermediate done directory: [" + intermediateDoneDirPath + "]", e);
}
- jobListCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY, DEFAULT_JOBLIST_CACHE_SIZE);
- loadedJobCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY, DEFAULT_LOADEDJOB_CACHE_SIZE);
- dateStringCacheSize = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_DATESTRING_CACHE_SIZE);
- moveThreadInterval = conf.getLong(YarnMRJobConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_MOVE_THREAD_INTERVAL);
- numMoveThreads = conf.getInt(YarnMRJobConfig.HISTORY_SERVER_NUM_MOVE_THREADS, DEFAULT_MOVE_THREAD_COUNT);
+ jobListCacheSize = conf.getInt(JHConfig.HISTORY_SERVER_JOBLIST_CACHE_SIZE_KEY, DEFAULT_JOBLIST_CACHE_SIZE);
+ loadedJobCacheSize = conf.getInt(JHConfig.HISTORY_SERVER_LOADED_JOB_CACHE_SIZE_KEY, DEFAULT_LOADEDJOB_CACHE_SIZE);
+ dateStringCacheSize = conf.getInt(JHConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_DATESTRING_CACHE_SIZE);
+ moveThreadInterval = conf.getLong(JHConfig.HISTORY_SERVER_DATESTRING_CACHE_SIZE_KEY, DEFAULT_MOVE_THREAD_INTERVAL);
+ numMoveThreads = conf.getInt(JHConfig.HISTORY_SERVER_NUM_MOVE_THREADS, DEFAULT_MOVE_THREAD_COUNT);
try {
initExisting();
} catch (IOException e) {
@@ -255,6 +219,26 @@ public class JobHistory extends Abstract
super.init(conf);
}
+ private void mkdir(FileContext fc, Path path, FsPermission fsp)
+ throws IOException {
+ if (!fc.util().exists(path)) {
+ try {
+ fc.mkdir(path, fsp, true);
+
+ FileStatus fsStatus = fc.getFileStatus(path);
+ LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+ + ", Expected: " + fsp.toShort());
+ if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+ LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+ + ", " + fsp);
+ fc.setPermission(path, fsp);
+ }
+ } catch (FileAlreadyExistsException e) {
+ LOG.info("Directory: [" + path + "] already exists.");
+ }
+ }
+ }
+
@Override
public void start() {
//Start moveIntermediatToDoneThread
@@ -264,11 +248,17 @@ public class JobHistory extends Abstract
moveIntermediateToDoneThread.start();
//Start historyCleaner
- long maxAgeOfHistoryFiles = conf.getLong(
- YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
+ boolean startCleanerService = conf.getBoolean(JHConfig.RUN_HISTORY_CLEANER_KEY, true);
+ if (startCleanerService) {
+ long maxAgeOfHistoryFiles = conf.getLong(JHConfig.HISTORY_MAXAGE,
+ DEFAULT_HISTORY_MAX_AGE);
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1);
- long runInterval = conf.getLong(YarnMRJobConfig.HISTORY_CLEANER_RUN_INTERVAL, DEFAULT_RUN_INTERVAL);
- cleanerScheduledExecutor.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), 30*1000l, runInterval, TimeUnit.MILLISECONDS);
+ long runInterval = conf.getLong(JHConfig.HISTORY_CLEANER_RUN_INTERVAL,
+ DEFAULT_RUN_INTERVAL);
+ cleanerScheduledExecutor
+ .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
+ 30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
+ }
super.start();
}
@@ -441,40 +431,51 @@ public class JobHistory extends Abstract
}
}
+
/**
- * Populates files from the intermediate directory into the intermediate cache.
+ * Scans the intermediate directory to find user directories. Scans these
+ * for history files if the modification time for the directory has changed.
* @throws IOException
*/
private void scanIntermediateDirectory() throws IOException {
- List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(intermediateDoneDirPath, intermediaDoneDirFc);
- for (FileStatus fs : fileStatusList) {
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
- String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobIndexInfo.getJobId());
- if (intermediaDoneDirFc.util().exists(intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, doneFileName)))) {
- String confFileName = JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath().getParent(), confFileName), jobIndexInfo);
- if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
- intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+ List<FileStatus> userDirList = JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
+
+ for (FileStatus userDir : userDirList) {
+ String name = userDir.getPath().getName();
+ long newModificationTime = userDir.getModificationTime();
+ boolean shouldScan = false;
+ synchronized (userDirModificationTimeMap) {
+ if (!userDirModificationTimeMap.containsKey(name) || newModificationTime > userDirModificationTimeMap.get(name)) {
+ shouldScan = true;
+ userDirModificationTimeMap.put(name, newModificationTime);
+ }
}
+ if (shouldScan) {
+ scanIntermediateDirectory(userDir.getPath());
}
}
}
/**
- * Checks for the existance of the done file in the intermediate done
- * directory for the specified jobId.
- *
- * @param jobId the jobId.
- * @return true if a done file exists for the specified jobId.
+ * Scans the specified path and populates the intermediate cache.
+ * @param absPath
* @throws IOException
*/
- private boolean doneFileExists(JobId jobId) throws IOException {
- String doneFileName = JobHistoryUtils.getIntermediateDoneFileName(jobId);
- Path qualifiedDoneFilePath = intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, doneFileName));
- if (intermediaDoneDirFc.util().exists(qualifiedDoneFilePath)) {
- return true;
+ private void scanIntermediateDirectory(final Path absPath)
+ throws IOException {
+ List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
+ intermediateDoneDirFc);
+ for (FileStatus fs : fileStatusList) {
+ JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
+ .getName());
+ String confFileName = JobHistoryUtils
+ .getIntermediateConfFileName(jobIndexInfo.getJobId());
+ MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
+ .getParent(), confFileName), jobIndexInfo);
+ if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
+ intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
+ }
}
- return false;
}
/**
@@ -486,15 +487,10 @@ public class JobHistory extends Abstract
* @return A MetaInfo object for the jobId, null if not found.
* @throws IOException
*/
- private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId, boolean checkForDoneFile) throws IOException {
+ private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) throws IOException {
for (FileStatus fs : fileStatusList) {
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
if (jobIndexInfo.getJobId().equals(jobId)) {
- if (checkForDoneFile) {
- if (!doneFileExists(jobIndexInfo.getJobId())) {
- return null;
- }
- }
String confFileName = JobHistoryUtils
.getIntermediateConfFileName(jobIndexInfo.getJobId());
MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
@@ -524,7 +520,7 @@ public class JobHistory extends Abstract
for (String timestampPart : dateStringSet) {
Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, doneDirFc);
- MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId, false);
+ MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
if (metaInfo != null) {
return metaInfo;
}
@@ -539,21 +535,8 @@ public class JobHistory extends Abstract
* @throws IOException
*/
private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
- MetaInfo matchedMi = null;
- List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(intermediateDoneDirPath, intermediaDoneDirFc);
-
- MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId, true);
- if (metaInfo == null) {
- return null;
- }
- JobIndexInfo jobIndexInfo = metaInfo.getJobIndexInfo();
- if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
- intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
- matchedMi = metaInfo;
- } else {
- matchedMi = intermediateListCache.get(jobId);
- }
- return matchedMi;
+ scanIntermediateDirectory();
+ return intermediateListCache.get(jobId);
}
@@ -862,7 +845,7 @@ public class JobHistory extends Abstract
try {
moveToDoneNow(historyFile, toPath);
} catch (IOException e) {
- LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId);
+ LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId, e);
return;
}
metaInfo.setHistoryFile(toPath);
@@ -872,29 +855,24 @@ public class JobHistory extends Abstract
try {
moveToDoneNow(confFile, toPath);
} catch (IOException e) {
- LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId);
+ LOG.info("Failed to move file: " + historyFile + " for jobId: " + jobId, e);
return;
}
metaInfo.setConfFile(toPath);
}
}
- //TODO Does this need to be synchronized ?
- Path doneFileToDelete = intermediaDoneDirFc.makeQualified(new Path(intermediateDoneDirPath, JobHistoryUtils.getIntermediateDoneFileName(jobId)));
- try {
- intermediaDoneDirFc.delete(doneFileToDelete, false);
- } catch (IOException e) {
- LOG.info("Unable to remove done file: " + doneFileToDelete);
- }
addToJobListCache(jobId, metaInfo);
intermediateListCache.remove(jobId);
}
- private void moveToDoneNow(Path src, Path target) throws IOException {
+ private void moveToDoneNow(final Path src, final Path target)
+ throws IOException {
LOG.info("Moving " + src.toString() + " to " + target.toString());
- intermediaDoneDirFc.util().copy(src, target);
- intermediaDoneDirFc.delete(src, false);
- doneDirFc.setPermission(target,
- new FsPermission(JobHistoryUtils.HISTORY_FILE_PERMISSION));
+ intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
+ // fc.util().copy(src, target);
+ //fc.delete(src, false);
+ //intermediateDoneDirFc.setPermission(target, new FsPermission(
+ //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION));
}
private void maybeMakeSubdirectory(Path path) throws IOException {
@@ -913,7 +891,16 @@ public class JobHistory extends Abstract
}
} catch (FileNotFoundException fnfE) {
try {
- doneDirFc.mkdir(path, new FsPermission(JobHistoryUtils.HISTORY_DIR_PERMISSION), true);
+ FsPermission fsp = new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
+ doneDirFc.mkdir(path, fsp, true);
+ FileStatus fsStatus = doneDirFc.getFileStatus(path);
+ LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+ + ", Expected: " + fsp.toShort());
+ if (fsStatus.getPermission().toShort() != fsp.toShort()) {
+ LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+ + ", " + fsp);
+ doneDirFc.setPermission(path, fsp);
+ }
synchronized(existingDoneSubdirs) {
existingDoneSubdirs.add(path);
}
@@ -931,6 +918,7 @@ public class JobHistory extends Abstract
return new Path(doneDirPrefixPath, JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
}
+
@Override
public synchronized Job getJob(JobId jobId) {
Job job = null;
@@ -974,7 +962,6 @@ public class JobHistory extends Abstract
-
static class MetaInfo {
private Path historyFile;
private Path confFile;
@@ -1065,28 +1052,19 @@ public class JobHistory extends Abstract
deleteFile(metaInfo.getConfFile());
jobListCache.remove(metaInfo.getJobIndexInfo().getJobId());
loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId());
- //TODO Get rid of entries in the cache.
}
- private void deleteFile(Path path) throws IOException {
- delete(path, false);
+ private void deleteFile(final Path path) throws IOException {
+ doneDirFc.delete(doneDirFc.makeQualified(path), false);
filesDeleted++;
}
private void deleteDir(Path path) throws IOException {
- delete(path, true);
+ doneDirFc.delete(doneDirFc.makeQualified(path), true);
dirsDeleted++;
}
-
- private void delete(Path path, boolean recursive) throws IOException {
- doneDirFc.delete(doneDirFc.makeQualified(path), recursive);
}
- }
-
-
-
-
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Tue May 24 21:27:23 2011
@@ -18,11 +18,16 @@
package org.apache.hadoop.mapreduce.v2.hs;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.CompositeService;
@@ -46,8 +51,14 @@ public class JobHistoryServer extends Co
super(JobHistoryServer.class.getName());
}
+ @Override
public synchronized void init(Configuration conf) {
Configuration config = new YarnConfiguration(conf);
+ try {
+ doSecureLogin(conf);
+ } catch(IOException ie) {
+ throw new YarnException("History Server Failed to login", ie);
+ }
jobHistoryService = new JobHistory();
historyContext = (HistoryContext)jobHistoryService;
clientService = new HistoryClientService(historyContext);
@@ -56,6 +67,11 @@ public class JobHistoryServer extends Co
super.init(config);
}
+ protected void doSecureLogin(Configuration conf) throws IOException {
+ SecurityUtil.login(conf, JHConfig.HS_KEYTAB_KEY,
+ JHConfig.HS_SERVER_PRINCIPAL_KEY);
+ }
+
public static void main(String[] args) {
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
JobHistoryServer server = null;
@@ -69,5 +85,4 @@ public class JobHistoryServer extends Co
System.exit(-1);
}
}
-
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue May 24 21:27:23 2011
@@ -60,15 +60,14 @@ public class TestJobHistoryParsing {
//make sure all events are flushed
app.waitForState(Service.STATE.STOPPED);
- String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
+ String jobhistoryDir = JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
JobHistory jobHistory = new JobHistory();
jobHistory.init(conf);
- String currentJobHistoryDir = JobHistoryUtils.getCurrentDoneDir(jobhistoryDir);
JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId).getJobIndexInfo();
String jobhistoryFileName = FileNameIndexUtils.getDoneFileName(jobIndexInfo);
- Path historyFilePath = new Path(currentJobHistoryDir, jobhistoryFileName);
+ Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
FSDataInputStream in = null;
LOG.info("JobHistoryFile is: " + historyFilePath);
try {
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Tue May 24 21:27:23 2011
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.JobID;
@@ -32,7 +33,6 @@ import org.apache.hadoop.mapreduce.JobSt
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
@@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.ipc.RPCUti
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
public class ClientServiceDelegate {
private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
@@ -116,7 +118,7 @@ public class ClientServiceDelegate {
UserGroupInformation.getCurrentUser().addToken(clientToken);
}
LOG.info("Connecting to " + serviceAddr);
- instantiateProxy(serviceAddr);
+ instantiateAMProxy(serviceAddr);
return;
} catch (Exception e) {
//possibly
@@ -132,15 +134,16 @@ public class ClientServiceDelegate {
appMaster = rm.getApplicationMaster(currentAppId);
}
}
+ //TODO Should this be additional states ?
if (ApplicationState.COMPLETED.equals(appMaster.getState())) {
- serviceAddr = conf.get(YarnMRJobConfig.HS_BIND_ADDRESS,
- YarnMRJobConfig.DEFAULT_HS_BIND_ADDRESS);
+ serviceAddr = conf.get(JHConfig.HS_BIND_ADDRESS,
+ JHConfig.DEFAULT_HS_BIND_ADDRESS);
LOG.info("Application state is completed. " +
"Redirecting to job history server " + serviceAddr);
//TODO:
serviceHttpAddr = "";
try {
- instantiateProxy(serviceAddr);
+ instantiateHistoryProxy(serviceAddr);
return;
} catch (IOException e) {
throw new YarnException(e);
@@ -151,8 +154,9 @@ public class ClientServiceDelegate {
"Cannot connect to Application with state " + appMaster.getState());
}
- private void instantiateProxy(final String serviceAddr) throws IOException {
+ private void instantiateAMProxy(final String serviceAddr) throws IOException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() {
@@ -165,6 +169,20 @@ public class ClientServiceDelegate {
NetUtils.createSocketAddr(serviceAddr), myConf);
}
});
+ LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+ }
+
+ private void instantiateHistoryProxy(final String serviceAddr)
+ throws IOException {
+ LOG.trace("Connecting to HistoryServer at: " + serviceAddr);
+ Configuration myConf = new Configuration(conf);
+ //TODO This should ideally be using it's own class (instead of ClientRMSecurityInfo)
+ myConf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ ClientRMSecurityInfo.class, SecurityInfo.class);
+ YarnRPC rpc = YarnRPC.create(myConf);
+ realProxy = (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+ NetUtils.createSocketAddr(serviceAddr), myConf);
+ LOG.trace("Connected to HistoryServer at: " + serviceAddr);
}
public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Tue May 24 21:27:23 2011
@@ -28,7 +28,6 @@ import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
@@ -57,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
@@ -115,7 +115,7 @@ public class TestClientRedirect {
Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS);
- conf.set(YarnMRJobConfig.HS_BIND_ADDRESS, HSHOSTADDRESS);
+ conf.set(JHConfig.HS_BIND_ADDRESS, HSHOSTADDRESS);
RMService rmService = new RMService("test");
rmService.init(conf);
rmService.start();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java?rev=1127298&r1=1127297&r2=1127298&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/ApplicationsManagerImpl.java Tue May 24 21:27:23 2011
@@ -142,6 +142,7 @@ public class ApplicationsManagerImpl ext
public synchronized ApplicationMaster getApplicationMaster(ApplicationId applicationId) {
ApplicationMaster appMaster =
amTracker.get(applicationId).getMaster();
+ //TODO NPE (When the RM is restarted - it doesn't know about previous AMs)
return appMaster;
}