You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tg...@apache.org on 2012/04/10 20:11:27 UTC
svn commit: r1311896 [2/2] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/
hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/mai...
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Tue Apr 10 18:11:26 2012
@@ -1,36 +1,26 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.mapreduce.v2.hs;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -41,26 +31,16 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.YarnException;
@@ -69,106 +49,36 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/*
+/**
* Loads and manages the Job history cache.
*/
-public class JobHistory extends AbstractService implements HistoryContext {
-
- private static final int DEFAULT_JOBLIST_CACHE_SIZE = 20000;
- private static final int DEFAULT_LOADEDJOB_CACHE_SIZE = 5;
- private static final int DEFAULT_DATESTRING_CACHE_SIZE = 200000;
- private static final long DEFAULT_MOVE_THREAD_INTERVAL = 3 * 60 * 1000l; //3 minutes
- private static final int DEFAULT_MOVE_THREAD_COUNT = 3;
-
- static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week
- static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day
-
+public class JobHistory extends AbstractService implements HistoryContext {
private static final Log LOG = LogFactory.getLog(JobHistory.class);
- private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
- public static final Pattern CONF_FILENAME_REGEX =
- Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+ public static final Pattern CONF_FILENAME_REGEX = Pattern.compile("("
+ + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
public static final String OLD_SUFFIX = ".old";
- private static String DONE_BEFORE_SERIAL_TAIL =
- JobHistoryUtils.doneSubdirsBeforeSerialTail();
-
- /**
- * Maps between a serial number (generated based on jobId) and the timestamp
- * component(s) to which it belongs.
- * Facilitates jobId based searches.
- * If a jobId is not found in this list - it will not be found.
- */
- private final SortedMap<String, Set<String>> idToDateString =
- new ConcurrentSkipListMap<String, Set<String>>();
-
- //Maintains minimal details for recent jobs (parsed from history file name).
- //Sorted on Job Completion Time.
- private final SortedMap<JobId, MetaInfo> jobListCache =
- new ConcurrentSkipListMap<JobId, MetaInfo>();
-
-
- // Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
- // Check for existance of the object when using iterators.
- private final SortedMap<JobId, MetaInfo> intermediateListCache =
- new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>();
-
- //Maintains a list of known done subdirectories. Not currently used.
- private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
-
- private Map<JobId, Job> loadedJobCache = null;
-
- /**
- * Maintains a mapping between intermediate user directories and the last
- * known modification time.
- */
- private Map<String, Long> userDirModificationTimeMap =
- new HashMap<String, Long>();
-
- //The number of jobs to maintain in the job list cache.
- private int jobListCacheSize;
-
- private JobACLsManager aclsMgr;
-
- //The number of loaded jobs.
- private int loadedJobCacheSize;
-
- //The number of entries in idToDateString
- private int dateStringCacheSize;
-
- //Time interval for the move thread.
+ // Time interval for the move thread.
private long moveThreadInterval;
-
- //Number of move threads.
+
+ // Number of move threads.
private int numMoveThreads;
-
- private Configuration conf;
- private boolean debugMode;
- private int serialNumberLowDigits;
- private String serialNumberFormat;
-
-
- private Path doneDirPrefixPath = null; // folder for completed jobs
- private FileContext doneDirFc; // done Dir FileContext
-
- private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path
- private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext
+ private Configuration conf;
private Thread moveIntermediateToDoneThread = null;
private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
+
private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
-
- /**
- * Writes out files to the path
- * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
- */
- @SuppressWarnings("serial")
+ private HistoryStorage storage = null;
+ private HistoryFileManager hsManager = null;
+
@Override
public void init(Configuration conf) throws YarnException {
LOG.info("JobHistory Init");
@@ -176,121 +86,66 @@ public class JobHistory extends Abstract
this.appID = RecordFactoryProvider.getRecordFactory(conf)
.newRecordInstance(ApplicationId.class);
this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
- .newRecordInstance(ApplicationAttemptId.class);
-
- debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
- serialNumberLowDigits = debugMode ? 1 : 3;
- serialNumberFormat = ("%0"
- + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS
- + serialNumberLowDigits) + "d");
+ .newRecordInstance(ApplicationAttemptId.class);
- String doneDirPrefix = null;
- doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
- try {
- doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
- new Path(doneDirPrefix));
- doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
- mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
- JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
- } catch (IOException e) {
- throw new YarnException("Error creating done directory: [" +
- doneDirPrefixPath + "]", e);
- }
-
- String intermediateDoneDirPrefix = null;
- intermediateDoneDirPrefix = JobHistoryUtils
- .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
- try {
- intermediateDoneDirPath = FileContext.getFileContext(conf)
- .makeQualified(new Path(intermediateDoneDirPrefix));
- intermediateDoneDirFc = FileContext.getFileContext(
- intermediateDoneDirPath.toUri(), conf);
- mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
- JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
- } catch (IOException e) {
- LOG.info("error creating done directory on dfs " + e);
- throw new YarnException("Error creating intermediate done directory: ["
- + intermediateDoneDirPath + "]", e);
- }
-
- this.aclsMgr = new JobACLsManager(conf);
-
- jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
- DEFAULT_JOBLIST_CACHE_SIZE);
- loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
- DEFAULT_LOADEDJOB_CACHE_SIZE);
- dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
- DEFAULT_DATESTRING_CACHE_SIZE);
- moveThreadInterval =
- conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
- DEFAULT_MOVE_THREAD_INTERVAL);
+ moveThreadInterval = conf.getLong(
+ JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
- DEFAULT_MOVE_THREAD_COUNT);
-
- loadedJobCache =
- Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
- loadedJobCacheSize + 1, 0.75f, true) {
- @Override
- public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
- return super.size() > loadedJobCacheSize;
- }
- });
-
+ JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+
+ hsManager = new HistoryFileManager();
+ hsManager.init(conf);
try {
- initExisting();
+ hsManager.initExisting();
} catch (IOException e) {
throw new YarnException("Failed to intialize existing directories", e);
}
- super.init(conf);
- }
-
- private void mkdir(FileContext fc, Path path, FsPermission fsp)
- throws IOException {
- if (!fc.util().exists(path)) {
- try {
- fc.mkdir(path, fsp, true);
- FileStatus fsStatus = fc.getFileStatus(path);
- LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
- + ", Expected: " + fsp.toShort());
- if (fsStatus.getPermission().toShort() != fsp.toShort()) {
- LOG.info("Explicitly setting permissions to : " + fsp.toShort()
- + ", " + fsp);
- fc.setPermission(path, fsp);
- }
- } catch (FileAlreadyExistsException e) {
- LOG.info("Directory: [" + path + "] already exists.");
- }
+ storage = ReflectionUtils.newInstance(conf.getClass(
+ JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
+ HistoryStorage.class), conf);
+ if (storage instanceof Service) {
+ ((Service) storage).init(conf);
}
+ storage.setHistoryFileManager(hsManager);
+
+ super.init(conf);
}
@Override
public void start() {
- //Start moveIntermediatToDoneThread
- moveIntermediateToDoneRunnable =
- new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
+ hsManager.start();
+ if (storage instanceof Service) {
+ ((Service) storage).start();
+ }
+
+ // Start moveIntermediatToDoneThread
+ moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(
+ moveThreadInterval, numMoveThreads);
moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
moveIntermediateToDoneThread.start();
-
- //Start historyCleaner
+
+ // Start historyCleaner
boolean startCleanerService = conf.getBoolean(
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
if (startCleanerService) {
long maxAgeOfHistoryFiles = conf.getLong(
- JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
+ JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder().setNameFormat("LogCleaner").build()
- );
+ new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
long runInterval = conf.getLong(
- JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
+ JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
cleanerScheduledExecutor
.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
}
super.start();
}
-
+
@Override
public void stop() {
LOG.info("Stopping JobHistory");
@@ -323,281 +178,16 @@ public class JobHistory extends Abstract
LOG.warn("HistoryCleanerService shutdown may not have succeeded");
}
}
+ if (storage instanceof Service) {
+ ((Service) storage).stop();
+ }
+ hsManager.stop();
super.stop();
}
-
+
public JobHistory() {
super(JobHistory.class.getName());
}
-
- /**
- * Populates index data structures.
- * Should only be called at initialization times.
- */
- @SuppressWarnings("unchecked")
- private void initExisting() throws IOException {
- LOG.info("Initializing Existing Jobs...");
- List<FileStatus> timestampedDirList = findTimestampedDirectories();
- Collections.sort(timestampedDirList);
- for (FileStatus fs : timestampedDirList) {
- //TODO Could verify the correct format for these directories.
- addDirectoryToSerialNumberIndex(fs.getPath());
- addDirectoryToJobListCache(fs.getPath());
- }
- }
-
- private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
- String serialPart = serialDirPath.getName();
- String timeStampPart =
- JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
- if (timeStampPart == null) {
- LOG.warn("Could not find timestamp portion from path: " +
- serialDirPath.toString() +". Continuing with next");
- return;
- }
- if (serialPart == null) {
- LOG.warn("Could not find serial portion from path: " +
- serialDirPath.toString() + ". Continuing with next");
- return;
- }
- if (idToDateString.containsKey(serialPart)) {
- Set<String> set = idToDateString.get(serialPart);
- set.remove(timeStampPart);
- if (set.isEmpty()) {
- idToDateString.remove(serialPart);
- }
- }
-
- }
-
- private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+serialDirPath+" to serial index");
- }
- String serialPart = serialDirPath.getName();
- String timestampPart =
- JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
- if (timestampPart == null) {
- LOG.warn("Could not find timestamp portion from path: " +
- serialDirPath.toString() +". Continuing with next");
- return;
- }
- if (serialPart == null) {
- LOG.warn("Could not find serial portion from path: " +
- serialDirPath.toString() + ". Continuing with next");
- }
- addToSerialNumberIndex(serialPart, timestampPart);
- }
-
- private void addToSerialNumberIndex(String serialPart, String timestampPart) {
- if (!idToDateString.containsKey(serialPart)) {
- idToDateString.put(serialPart, new HashSet<String>());
- if (idToDateString.size() > dateStringCacheSize) {
- idToDateString.remove(idToDateString.firstKey());
- }
- Set<String> datePartSet = idToDateString.get(serialPart);
- datePartSet.add(timestampPart);
- }
- }
-
- private void addDirectoryToJobListCache(Path path) throws IOException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+path+" to job list cache.");
- }
- List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
- doneDirFc);
- for (FileStatus fs : historyFileList) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding in history for "+fs.getPath());
- }
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
- .getName());
- String confFileName = JobHistoryUtils
- .getIntermediateConfFileName(jobIndexInfo.getJobId());
- String summaryFileName = JobHistoryUtils
- .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- addToJobListCache(jobIndexInfo.getJobId(), metaInfo);
- }
- }
-
- private static List<FileStatus> scanDirectory(Path path, FileContext fc,
- PathFilter pathFilter) throws IOException {
- path = fc.makeQualified(path);
- List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
- RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
- while (fileStatusIter.hasNext()) {
- FileStatus fileStatus = fileStatusIter.next();
- Path filePath = fileStatus.getPath();
- if (fileStatus.isFile() && pathFilter.accept(filePath)) {
- jhStatusList.add(fileStatus);
- }
- }
- return jhStatusList;
- }
-
- private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
- FileContext fc) throws IOException {
- return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
- }
-
- /**
- * Finds all history directories with a timestamp component by scanning
- * the filesystem.
- * Used when the JobHistory server is started.
- * @return
- */
- private List<FileStatus> findTimestampedDirectories() throws IOException {
- List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
- doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
- return fsList;
- }
-
- /**
- * Adds an entry to the job list cache. Maintains the size.
- */
- private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+jobId+" to job list cache with "
- +metaInfo.getJobIndexInfo());
- }
- jobListCache.put(jobId, metaInfo);
- if (jobListCache.size() > jobListCacheSize) {
- jobListCache.remove(jobListCache.firstKey());
- }
- }
-
- /**
- * Adds an entry to the loaded job cache. Maintains the size.
- */
- private void addToLoadedJobCache(Job job) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+job.getID()+" to loaded job cache");
- }
- loadedJobCache.put(job.getID(), job);
- }
-
-
- /**
- * Scans the intermediate directory to find user directories. Scans these
- * for history files if the modification time for the directory has changed.
- * @throws IOException
- */
- private void scanIntermediateDirectory() throws IOException {
- List<FileStatus> userDirList =
- JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
-
- for (FileStatus userDir : userDirList) {
- String name = userDir.getPath().getName();
- long newModificationTime = userDir.getModificationTime();
- boolean shouldScan = false;
- synchronized (userDirModificationTimeMap) {
- if (!userDirModificationTimeMap.containsKey(name) || newModificationTime
- > userDirModificationTimeMap.get(name)) {
- shouldScan = true;
- userDirModificationTimeMap.put(name, newModificationTime);
- }
- }
- if (shouldScan) {
- scanIntermediateDirectory(userDir.getPath());
- }
- }
- }
-
- /**
- * Scans the specified path and populates the intermediate cache.
- * @param absPath
- * @throws IOException
- */
- private void scanIntermediateDirectory(final Path absPath)
- throws IOException {
- List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
- intermediateDoneDirFc);
- for (FileStatus fs : fileStatusList) {
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
- .getName());
- String confFileName = JobHistoryUtils
- .getIntermediateConfFileName(jobIndexInfo.getJobId());
- String summaryFileName = JobHistoryUtils
- .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
- intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
- }
- }
- }
-
- /**
- * Searches the job history file FileStatus list for the specified JobId.
- *
- * @param fileStatusList fileStatus list of Job History Files.
- * @param jobId The JobId to find.
- * @param checkForDoneFile whether to check for the existance of a done file.
- * @return A MetaInfo object for the jobId, null if not found.
- * @throws IOException
- */
- private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
- throws IOException {
- for (FileStatus fs : fileStatusList) {
- JobIndexInfo jobIndexInfo =
- FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
- if (jobIndexInfo.getJobId().equals(jobId)) {
- String confFileName = JobHistoryUtils
- .getIntermediateConfFileName(jobIndexInfo.getJobId());
- String summaryFileName = JobHistoryUtils
- .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- return metaInfo;
- }
- }
- return null;
- }
-
- /**
- * Scans old directories known by the idToDateString map for the specified
- * jobId.
- * If the number of directories is higher than the supported size of the
- * idToDateString cache, the jobId will not be found.
- * @param jobId the jobId.
- * @return
- * @throws IOException
- */
- private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
- int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
- String boxedSerialNumber = String.valueOf(jobSerialNumber);
- Set<String> dateStringSet = idToDateString.get(boxedSerialNumber);
- if (dateStringSet == null) {
- return null;
- }
- for (String timestampPart : dateStringSet) {
- Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
- List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
- doneDirFc);
- MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
- if (metaInfo != null) {
- return metaInfo;
- }
- }
- return null;
- }
-
- /**
- * Checks for the existence of the job history file in the intermediate
- * directory.
- * @param jobId
- * @return
- * @throws IOException
- */
- private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
- scanIntermediateDirectory();
- return intermediateListCache.get(jobId);
- }
@Override
public String getApplicationName() {
@@ -609,486 +199,167 @@ public class JobHistory extends Abstract
private long sleepTime;
private ThreadPoolExecutor moveToDoneExecutor = null;
private boolean running = false;
-
- public void stop() {
+
+ public synchronized void stop() {
running = false;
+ notify();
}
-
+
MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
this.sleepTime = sleepTime;
- ThreadFactory tf = new ThreadFactoryBuilder()
- .setNameFormat("MoveIntermediateToDone Thread #%d")
- .build();
- moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+ "MoveIntermediateToDone Thread #%d").build();
+ moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
running = true;
}
-
- @Override
+
+ @Override
public void run() {
Thread.currentThread().setName("IntermediateHistoryScanner");
try {
- while (running) {
+ while (true) {
LOG.info("Starting scan to move intermediate done files");
- scanIntermediateDirectory();
- for (final MetaInfo metaInfo : intermediateListCache.values()) {
+ for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
moveToDoneExecutor.execute(new Runnable() {
@Override
public void run() {
try {
- moveToDone(metaInfo);
+ hsManager.moveToDone(metaInfo);
} catch (IOException e) {
- LOG.info("Failed to process metaInfo for job: " +
- metaInfo.jobIndexInfo.getJobId(), e);
+ LOG.info(
+ "Failed to process metaInfo for job: "
+ + metaInfo.getJobId(), e);
}
}
});
-
}
- synchronized (this) { // TODO Is this really required.
+ synchronized (this) {
try {
this.wait(sleepTime);
} catch (InterruptedException e) {
LOG.info("IntermediateHistoryScannerThread interrupted");
}
+ if (!running) {
+ break;
+ }
}
}
} catch (IOException e) {
- LOG.warn("Unable to get a list of intermediate files to be moved from: "
- + intermediateDoneDirPath);
+ LOG.warn("Unable to get a list of intermediate files to be moved");
+ // TODO Shut down the entire process!!!!
}
}
}
-
- private Job loadJob(MetaInfo metaInfo) {
- synchronized(metaInfo) {
- try {
- Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
- metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
- metaInfo.getConfFile(), this.aclsMgr);
- addToLoadedJobCache(job);
- return job;
- } catch (IOException e) {
- throw new YarnException("Could not find/load job: " +
- metaInfo.getJobIndexInfo().getJobId(), e);
- }
- }
- }
-
- private Map<JobId, Job> getAllJobsInternal() {
- //TODO This should ideally be using getAllJobsMetaInfo
- // or get rid of that method once Job has APIs for user, finishTime etc.
- SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
- try {
- scanIntermediateDirectory();
- } catch (IOException e) {
- LOG.warn("Failed to scan intermediate directory", e);
- throw new YarnException(e);
- }
- for (JobId jobId : intermediateListCache.keySet()) {
- MetaInfo mi = intermediateListCache.get(jobId);
- if (mi != null) {
- result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
- .getJobIndexInfo().getJobId()));
- }
- }
- for (JobId jobId : jobListCache.keySet()) {
- MetaInfo mi = jobListCache.get(jobId);
- if (mi != null) {
- result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
- .getJobIndexInfo().getJobId()));
- }
- }
- return result;
- }
/**
* Helper method for test cases.
*/
MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
- //MetaInfo available in cache.
- MetaInfo metaInfo = null;
- if (jobListCache.containsKey(jobId)) {
- metaInfo = jobListCache.get(jobId);
- }
-
- if (metaInfo != null) {
- return metaInfo;
- }
-
- //MetaInfo not available. Check intermediate directory for meta info.
- metaInfo = scanIntermediateForJob(jobId);
- if (metaInfo != null) {
- return metaInfo;
- }
-
- //Intermediate directory does not contain job. Search through older ones.
- metaInfo = scanOldDirsForJob(jobId);
- if (metaInfo != null) {
- return metaInfo;
- }
- return null;
- }
-
- private Job findJob(JobId jobId) throws IOException {
- //Job already loaded.
- if (loadedJobCache.containsKey(jobId)) {
- return loadedJobCache.get(jobId);
- }
-
- //MetaInfo available in cache.
- MetaInfo metaInfo = null;
- if (jobListCache.containsKey(jobId)) {
- metaInfo = jobListCache.get(jobId);
- }
-
- if (metaInfo != null) {
- return loadJob(metaInfo);
- }
-
- //MetaInfo not available. Check intermediate directory for meta info.
- metaInfo = scanIntermediateForJob(jobId);
- if (metaInfo != null) {
- return loadJob(metaInfo);
- }
-
- //Intermediate directory does not contain job. Search through older ones.
- metaInfo = scanOldDirsForJob(jobId);
- if (metaInfo != null) {
- return loadJob(metaInfo);
- }
- return null;
- }
-
- private void moveToDone(MetaInfo metaInfo) throws IOException {
- long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
- if (completeTime == 0) completeTime = System.currentTimeMillis();
- JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-
- List<Path> paths = new ArrayList<Path>();
- Path historyFile = metaInfo.getHistoryFile();
- if (historyFile == null) {
- LOG.info("No file for job-history with " + jobId + " found in cache!");
- } else {
- paths.add(historyFile);
- }
-
- Path confFile = metaInfo.getConfFile();
- if (confFile == null) {
- LOG.info("No file for jobConf with " + jobId + " found in cache!");
- } else {
- paths.add(confFile);
- }
-
- //TODO Check all mi getters and setters for the conf path
- Path summaryFile = metaInfo.getSummaryFile();
- if (summaryFile == null) {
- LOG.info("No summary file for job: " + jobId);
- } else {
- try {
- String jobSummaryString = getJobSummary(intermediateDoneDirFc, summaryFile);
- SUMMARY_LOG.info(jobSummaryString);
- LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
- intermediateDoneDirFc.delete(summaryFile, false);
- metaInfo.setSummaryFile(null);
- } catch (IOException e) {
- LOG.warn("Failed to process summary file: [" + summaryFile + "]");
- throw e;
- }
- }
-
- Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
- addDirectoryToSerialNumberIndex(targetDir);
- try {
- maybeMakeSubdirectory(targetDir);
- } catch (IOException e) {
- LOG.warn("Failed creating subdirectory: " + targetDir +
- " while attempting to move files for jobId: " + jobId);
- throw e;
- }
- synchronized (metaInfo) {
- if (historyFile != null) {
- Path toPath = doneDirFc.makeQualified(new Path(targetDir,
- historyFile.getName()));
- try {
- moveToDoneNow(historyFile, toPath);
- } catch (IOException e) {
- LOG.warn("Failed to move file: " + historyFile + " for jobId: "
- + jobId);
- throw e;
- }
- metaInfo.setHistoryFile(toPath);
- }
- if (confFile != null) {
- Path toPath = doneDirFc.makeQualified(new Path(targetDir,
- confFile.getName()));
- try {
- moveToDoneNow(confFile, toPath);
- } catch (IOException e) {
- LOG.warn("Failed to move file: " + historyFile + " for jobId: "
- + jobId);
- throw e;
- }
- metaInfo.setConfFile(toPath);
- }
- }
- addToJobListCache(jobId, metaInfo);
- intermediateListCache.remove(jobId);
+ return hsManager.getMetaInfo(jobId);
}
-
- private void moveToDoneNow(final Path src, final Path target)
- throws IOException {
- LOG.info("Moving " + src.toString() + " to " + target.toString());
- intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
- // fc.util().copy(src, target);
- //fc.delete(src, false);
- //intermediateDoneDirFc.setPermission(target, new FsPermission(
- //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION));
- }
-
- String getJobSummary(FileContext fc, Path path) throws IOException {
- Path qPath = fc.makeQualified(path);
- FSDataInputStream in = fc.open(qPath);
- String jobSummaryString = in.readUTF();
- in.close();
- return jobSummaryString;
- }
-
- private void maybeMakeSubdirectory(Path path) throws IOException {
- boolean existsInExistingCache = false;
- synchronized(existingDoneSubdirs) {
- if (existingDoneSubdirs.contains(path)) existsInExistingCache = true;
- }
- try {
- doneDirFc.getFileStatus(path);
- if (!existsInExistingCache) {
- existingDoneSubdirs.add(path);
- if (debugMode) {
- LOG.info("JobHistory.maybeMakeSubdirectory -- We believed "
- + path + " already existed, but it didn't.");
- }
- }
- } catch (FileNotFoundException fnfE) {
- try {
- FsPermission fsp =
- new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
- doneDirFc.mkdir(path, fsp, true);
- FileStatus fsStatus = doneDirFc.getFileStatus(path);
- LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
- + ", Expected: " + fsp.toShort());
- if (fsStatus.getPermission().toShort() != fsp.toShort()) {
- LOG.info("Explicitly setting permissions to : " + fsp.toShort()
- + ", " + fsp);
- doneDirFc.setPermission(path, fsp);
- }
- synchronized(existingDoneSubdirs) {
- existingDoneSubdirs.add(path);
- }
- } catch (FileAlreadyExistsException faeE) { //Nothing to do.
- }
- }
- }
-
- private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
- return new Path(doneDirPrefixPath,
- JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
- }
-
- private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
- String timestampComponent =
- JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
- return new Path(doneDirPrefixPath,
- JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
- }
-
@Override
- public synchronized Job getJob(JobId jobId) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Looking for Job "+jobId);
- }
- Job job = null;
- try {
- job = findJob(jobId);
- //This could return a null job.
- } catch (IOException e) {
- throw new YarnException(e);
- }
- return job;
+ public Job getJob(JobId jobId) {
+ return storage.getFullJob(jobId);
}
@Override
public Map<JobId, Job> getAllJobs(ApplicationId appID) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Called getAllJobs(AppId): " + appID);
}
-// currently there is 1 to 1 mapping between app and job id
+ // currently there is 1 to 1 mapping between app and job id
org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
Map<JobId, Job> jobs = new HashMap<JobId, Job>();
JobId jobID = TypeConverter.toYarn(oldJobID);
jobs.put(jobID, getJob(jobID));
return jobs;
-// return getAllJobs();
}
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.v2.hs.HistoryContext#getAllJobs()
- *
- * Returns a recent list of jobs. This may not be the complete set.
- * If a previous jobId is known - it can be queries via the getJob(JobId)
- * method.
- * Size of this list is determined by the size of the job list cache.
- * This can be fixed when pagination is implemented - return the first set of
- * jobs via the cache, go to DFS only when an attempt is made to navigate
- * past the cached list.
- * This does involve a DFS oepration of scanning the intermediate directory.
- */
+
+ @Override
public Map<JobId, Job> getAllJobs() {
- LOG.debug("Called getAllJobs()");
- return getAllJobsInternal();
+ return storage.getAllPartialJobs();
}
- static class MetaInfo {
- private Path historyFile;
- private Path confFile;
- private Path summaryFile;
- JobIndexInfo jobIndexInfo;
-
- MetaInfo(Path historyFile, Path confFile, Path summaryFile,
- JobIndexInfo jobIndexInfo) {
- this.historyFile = historyFile;
- this.confFile = confFile;
- this.summaryFile = summaryFile;
- this.jobIndexInfo = jobIndexInfo;
- }
-
- Path getHistoryFile() { return historyFile; }
- Path getConfFile() { return confFile; }
- Path getSummaryFile() { return summaryFile; }
- JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
-
- void setHistoryFile(Path historyFile) { this.historyFile = historyFile; }
- void setConfFile(Path confFile) {this.confFile = confFile; }
- void setSummaryFile(Path summaryFile) { this.summaryFile = summaryFile; }
+ /**
+ * Look for a set of partial jobs.
+ *
+ * @param offset
+ * the offset into the list of jobs.
+ * @param count
+ * the maximum number of jobs to return.
+ * @param user
+ * only return jobs for the given user.
+ * @param queue
+ * only return jobs for in the given queue.
+ * @param sBegin
+ * only return Jobs that started on or after the given time.
+ * @param sEnd
+ * only return Jobs that started on or before the given time.
+ * @param fBegin
+ * only return Jobs that ended on or after the given time.
+ * @param fEnd
+ * only return Jobs that ended on or before the given time.
+ * @param jobState
+ * only return jobs that are in the give job state.
+ * @return The list of filtered jobs.
+ */
+ @Override
+ public JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+ JobState jobState) {
+ return storage.getPartialJobs(offset, count, user, queue, sBegin, sEnd,
+ fBegin, fEnd, jobState);
}
-
public class HistoryCleaner implements Runnable {
- private long currentTime;
-
long maxAgeMillis;
- long filesDeleted = 0;
- long dirsDeleted = 0;
-
+
public HistoryCleaner(long maxAge) {
this.maxAgeMillis = maxAge;
}
-
- @SuppressWarnings("unchecked")
+
public void run() {
LOG.info("History Cleaner started");
- currentTime = System.currentTimeMillis();
- boolean halted = false;
- //TODO Delete YYYY/MM/DD directories.
+ long cutoff = System.currentTimeMillis() - maxAgeMillis;
try {
- List<FileStatus> serialDirList = findTimestampedDirectories();
- //Sort in ascending order. Relies on YYYY/MM/DD/Serial
- Collections.sort(serialDirList);
- for (FileStatus serialDir : serialDirList) {
- List<FileStatus> historyFileList =
- scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
- for (FileStatus historyFile : historyFileList) {
- JobIndexInfo jobIndexInfo =
- FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
- long effectiveTimestamp =
- getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
- if (shouldDelete(effectiveTimestamp)) {
- String confFileName =
- JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(historyFile.getPath(),
- new Path(historyFile.getPath().getParent(), confFileName),
- null, jobIndexInfo);
- delete(metaInfo);
- } else {
- halted = true;
- break;
- }
- }
- if (!halted) {
- deleteDir(serialDir.getPath());
- removeDirectoryFromSerialNumberIndex(serialDir.getPath());
- synchronized (existingDoneSubdirs) {
- existingDoneSubdirs.remove(serialDir.getPath());
- }
-
- } else {
- break; //Don't scan any more directories.
- }
- }
+ hsManager.clean(cutoff, storage);
} catch (IOException e) {
- LOG.warn("Error in History cleaner run", e);
+ LOG.warn("Error trying to clean up ", e);
}
LOG.info("History Cleaner complete");
- LOG.info("FilesDeleted: " + filesDeleted);
- LOG.info("Directories Deleted: " + dirsDeleted);
- }
-
- private boolean shouldDelete(long ts) {
- return ((ts + maxAgeMillis) <= currentTime);
- }
-
- private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
- if (finishTime == 0) {
- return fileStatus.getModificationTime();
- }
- return finishTime;
- }
-
- private void delete(MetaInfo metaInfo) throws IOException {
- deleteFile(metaInfo.getHistoryFile());
- deleteFile(metaInfo.getConfFile());
- jobListCache.remove(metaInfo.getJobIndexInfo().getJobId());
- loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId());
- }
-
- private void deleteFile(final Path path) throws IOException {
- doneDirFc.delete(doneDirFc.makeQualified(path), false);
- filesDeleted++;
- }
-
- private void deleteDir(Path path) throws IOException {
- doneDirFc.delete(doneDirFc.makeQualified(path), true);
- dirsDeleted++;
- }
}
-
-
-
- //TODO AppContext - Not Required
- private ApplicationAttemptId appAttemptID;
+ }
+
+ // TODO AppContext - Not Required
+ private ApplicationAttemptId appAttemptID;
+
@Override
public ApplicationAttemptId getApplicationAttemptId() {
- //TODO fixme - bogus appAttemptID for now
+ // TODO fixme - bogus appAttemptID for now
return appAttemptID;
- }
-
- //TODO AppContext - Not Required
+ }
+
+ // TODO AppContext - Not Required
private ApplicationId appID;
+
@Override
public ApplicationId getApplicationID() {
- //TODO fixme - bogus appID for now
+ // TODO fixme - bogus appID for now
return appID;
}
-
- //TODO AppContext - Not Required
+
+ // TODO AppContext - Not Required
@Override
public EventHandler getEventHandler() {
// TODO Auto-generated method stub
return null;
}
-
- //TODO AppContext - Not Required
+
+ // TODO AppContext - Not Required
private String userName;
+
@Override
public CharSequence getUser() {
if (userName != null) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Tue Apr 10 18:11:26 2012
@@ -51,6 +51,7 @@ public class PartialJob implements org.a
jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
jobReport.setStartTime(jobIndexInfo.getSubmitTime());
jobReport.setFinishTime(jobIndexInfo.getFinishTime());
+ jobReport.setJobState(getState());
}
@Override
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Tue Apr 10 18:11:26 2012
@@ -44,6 +44,7 @@ public class HsWebApp extends WebApp imp
bind(JAXBContextResolver.class);
bind(GenericExceptionHandler.class);
bind(AppContext.class).toInstance(history);
+ bind(HistoryContext.class).toInstance(history);
route("/", HsController.class);
route("/app", HsController.class);
route(pajoin("/job", JOB_ID), HsController.class, "job");
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java Tue Apr 10 18:11:26 2012
@@ -32,10 +32,8 @@ import javax.ws.rs.core.UriInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
@@ -49,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptsInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.AMAttemptsInfo;
import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.HistoryInfo;
@@ -64,7 +63,7 @@ import com.google.inject.Inject;
@Path("/ws/v1/history")
public class HsWebServices {
- private final AppContext appCtx;
+ private final HistoryContext ctx;
private WebApp webapp;
private final Configuration conf;
@@ -72,9 +71,9 @@ public class HsWebServices {
UriInfo uriInfo;
@Inject
- public HsWebServices(final AppContext appCtx, final Configuration conf,
+ public HsWebServices(final HistoryContext ctx, final Configuration conf,
final WebApp webapp) {
- this.appCtx = appCtx;
+ this.ctx = ctx;
this.conf = conf;
this.webapp = webapp;
}
@@ -103,33 +102,22 @@ public class HsWebServices {
@QueryParam("startedTimeEnd") String startedEnd,
@QueryParam("finishedTimeBegin") String finishBegin,
@QueryParam("finishedTimeEnd") String finishEnd) {
- JobsInfo allJobs = new JobsInfo();
- long num = 0;
- boolean checkCount = false;
- boolean checkStart = false;
- boolean checkEnd = false;
- long countNum = 0;
-
- // set values suitable in case both of begin/end not specified
- long sBegin = 0;
- long sEnd = Long.MAX_VALUE;
- long fBegin = 0;
- long fEnd = Long.MAX_VALUE;
+ Long countParam = null;
+
if (count != null && !count.isEmpty()) {
- checkCount = true;
try {
- countNum = Long.parseLong(count);
+ countParam = Long.parseLong(count);
} catch (NumberFormatException e) {
throw new BadRequestException(e.getMessage());
}
- if (countNum <= 0) {
+ if (countParam <= 0) {
throw new BadRequestException("limit value must be greater then 0");
}
}
+ Long sBegin = null;
if (startedBegin != null && !startedBegin.isEmpty()) {
- checkStart = true;
try {
sBegin = Long.parseLong(startedBegin);
} catch (NumberFormatException e) {
@@ -139,8 +127,9 @@ public class HsWebServices {
throw new BadRequestException("startedTimeBegin must be greater than 0");
}
}
+
+ Long sEnd = null;
if (startedEnd != null && !startedEnd.isEmpty()) {
- checkStart = true;
try {
sEnd = Long.parseLong(startedEnd);
} catch (NumberFormatException e) {
@@ -150,13 +139,13 @@ public class HsWebServices {
throw new BadRequestException("startedTimeEnd must be greater than 0");
}
}
- if (sBegin > sEnd) {
+ if (sBegin != null && sEnd != null && sBegin > sEnd) {
throw new BadRequestException(
"startedTimeEnd must be greater than startTimeBegin");
}
+ Long fBegin = null;
if (finishBegin != null && !finishBegin.isEmpty()) {
- checkEnd = true;
try {
fBegin = Long.parseLong(finishBegin);
} catch (NumberFormatException e) {
@@ -166,8 +155,8 @@ public class HsWebServices {
throw new BadRequestException("finishedTimeBegin must be greater than 0");
}
}
+ Long fEnd = null;
if (finishEnd != null && !finishEnd.isEmpty()) {
- checkEnd = true;
try {
fEnd = Long.parseLong(finishEnd);
} catch (NumberFormatException e) {
@@ -177,53 +166,18 @@ public class HsWebServices {
throw new BadRequestException("finishedTimeEnd must be greater than 0");
}
}
- if (fBegin > fEnd) {
+ if (fBegin != null && fEnd != null && fBegin > fEnd) {
throw new BadRequestException(
"finishedTimeEnd must be greater than finishedTimeBegin");
}
-
- for (Job job : appCtx.getAllJobs().values()) {
- if (checkCount && num == countNum) {
- break;
- }
-
- if (stateQuery != null && !stateQuery.isEmpty()) {
- JobState.valueOf(stateQuery);
- if (!job.getState().toString().equalsIgnoreCase(stateQuery)) {
- continue;
- }
- }
-
- // can't really validate queue is a valid one since queues could change
- if (queueQuery != null && !queueQuery.isEmpty()) {
- if (!job.getQueueName().equals(queueQuery)) {
- continue;
- }
- }
-
- if (userQuery != null && !userQuery.isEmpty()) {
- if (!job.getUserName().equals(userQuery)) {
- continue;
- }
- }
-
- JobReport report = job.getReport();
-
- if (checkStart
- && (report.getStartTime() < sBegin || report.getStartTime() > sEnd)) {
- continue;
- }
- if (checkEnd
- && (report.getFinishTime() < fBegin || report.getFinishTime() > fEnd)) {
- continue;
- }
-
- JobInfo jobInfo = new JobInfo(job);
-
- allJobs.add(jobInfo);
- num++;
+
+ JobState jobState = null;
+ if (stateQuery != null) {
+ jobState = JobState.valueOf(stateQuery);
}
- return allJobs;
+
+ return ctx.getPartialJobs(0l, countParam, userQuery, queueQuery,
+ sBegin, sEnd, fBegin, fEnd, jobState);
}
@GET
@@ -231,7 +185,7 @@ public class HsWebServices {
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobInfo getJob(@PathParam("jobid") String jid) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
return new JobInfo(job);
}
@@ -240,7 +194,7 @@ public class HsWebServices {
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
AMAttemptsInfo amAttempts = new AMAttemptsInfo();
for (AMInfo amInfo : job.getAMInfos()) {
AMAttemptInfo attempt = new AMAttemptInfo(amInfo, MRApps.toString(job
@@ -256,8 +210,8 @@ public class HsWebServices {
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public JobCounterInfo getJobCounters(@PathParam("jobid") String jid) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
- return new JobCounterInfo(this.appCtx, job);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
+ return new JobCounterInfo(this.ctx, job);
}
@GET
@@ -265,7 +219,7 @@ public class HsWebServices {
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ConfInfo getJobConf(@PathParam("jobid") String jid) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
ConfInfo info;
try {
info = new ConfInfo(job, this.conf);
@@ -282,7 +236,7 @@ public class HsWebServices {
public TasksInfo getJobTasks(@PathParam("jobid") String jid,
@QueryParam("type") String type) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
TasksInfo allTasks = new TasksInfo();
for (Task task : job.getTasks().values()) {
TaskType ttype = null;
@@ -307,7 +261,7 @@ public class HsWebServices {
public TaskInfo getJobTask(@PathParam("jobid") String jid,
@PathParam("taskid") String tid) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
return new TaskInfo(task);
@@ -319,7 +273,7 @@ public class HsWebServices {
public JobTaskCounterInfo getSingleTaskCounters(
@PathParam("jobid") String jid, @PathParam("taskid") String tid) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
TaskId taskID = MRApps.toTaskID(tid);
if (taskID == null) {
throw new NotFoundException("taskid " + tid + " not found or invalid");
@@ -338,7 +292,7 @@ public class HsWebServices {
@PathParam("taskid") String tid) {
TaskAttemptsInfo attempts = new TaskAttemptsInfo();
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
for (TaskAttempt ta : task.getAttempts().values()) {
if (ta != null) {
@@ -358,7 +312,7 @@ public class HsWebServices {
public TaskAttemptInfo getJobTaskAttemptId(@PathParam("jobid") String jid,
@PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);
@@ -376,7 +330,7 @@ public class HsWebServices {
@PathParam("jobid") String jid, @PathParam("taskid") String tid,
@PathParam("attemptid") String attId) {
- Job job = AMWebServices.getJobFromJobIdString(jid, appCtx);
+ Job job = AMWebServices.getJobFromJobIdString(jid, ctx);
Task task = AMWebServices.getTaskFromTaskIdString(tid, job);
TaskAttempt ta = AMWebServices.getTaskAttemptFromTaskAttemptString(attId,
task);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Tue Apr 10 18:11:26 2012
@@ -92,6 +92,14 @@ public class TestJobHistoryParsing {
checkHistoryParsing(3, 0, 2);
}
+ private static String getJobSummary(FileContext fc, Path path) throws IOException {
+ Path qPath = fc.makeQualified(path);
+ FSDataInputStream in = fc.open(qPath);
+ String jobSummaryString = in.readUTF();
+ in.close();
+ return jobSummaryString;
+ }
+
private void checkHistoryParsing(final int numMaps, final int numReduces,
final int numSuccessfulMaps)
throws Exception {
@@ -244,7 +252,7 @@ public class TestJobHistoryParsing {
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobId);
Path summaryFile = new Path(jobhistoryDir, summaryFileName);
- String jobSummaryString = jobHistory.getJobSummary(fc, summaryFile);
+ String jobSummaryString = getJobSummary(fc, summaryFile);
Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100"));
Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100"));
Assert.assertNotNull(jobSummaryString);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java Tue Apr 10 18:11:26 2012
@@ -30,11 +30,13 @@ import javax.xml.parsers.DocumentBuilder
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.JobHistory;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@@ -77,7 +79,7 @@ public class TestHsWebServices extends J
private static TestAppContext appContext;
private static HsWebApp webApp;
- static class TestAppContext implements AppContext {
+ static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@@ -144,6 +146,20 @@ public class TestHsWebServices extends J
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+ JobState jobState) {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
@@ -160,6 +176,7 @@ public class TestHsWebServices extends J
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
+ bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java Tue Apr 10 18:11:26 2012
@@ -35,6 +35,7 @@ import javax.xml.parsers.DocumentBuilder
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@@ -89,7 +92,7 @@ public class TestHsWebServicesAttempts e
private static TestAppContext appContext;
private static HsWebApp webApp;
- static class TestAppContext implements AppContext {
+ static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@@ -156,6 +159,20 @@ public class TestHsWebServicesAttempts e
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+ JobState jobState) {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
@@ -171,6 +188,7 @@ public class TestHsWebServicesAttempts e
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
+ bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java Tue Apr 10 18:11:26 2012
@@ -41,9 +41,12 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@@ -90,7 +93,7 @@ public class TestHsWebServicesJobConf ex
private static File testConfDir = new File("target",
TestHsWebServicesJobConf.class.getSimpleName() + "confDir");
- static class TestAppContext implements AppContext {
+ static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@@ -156,6 +159,20 @@ public class TestHsWebServicesJobConf ex
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+ JobState jobState) {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
@@ -195,6 +212,7 @@ public class TestHsWebServicesJobConf ex
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
+ bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java Tue Apr 10 18:11:26 2012
@@ -38,11 +38,15 @@ import javax.xml.parsers.DocumentBuilder
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@@ -90,7 +94,7 @@ public class TestHsWebServicesJobs exten
private static TestAppContext appContext;
private static HsWebApp webApp;
- static class TestAppContext implements AppContext {
+ static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@@ -169,6 +173,20 @@ public class TestHsWebServicesJobs exten
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+ JobState jobState) {
+ return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(),
+ offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState);
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
@@ -184,6 +202,7 @@ public class TestHsWebServicesJobs exten
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
+ bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java Tue Apr 10 18:11:26 2012
@@ -36,8 +36,11 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs;
import org.apache.hadoop.mapreduce.v2.hs.MockHistoryJobs.JobsPair;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@@ -77,7 +80,7 @@ public class TestHsWebServicesJobsQuery
private static TestAppContext appContext;
private static HsWebApp webApp;
- static class TestAppContext implements AppContext {
+ static class TestAppContext implements HistoryContext {
final String user = MockJobs.newUserName();
final Map<JobId, Job> fullJobs;
final Map<JobId, Job> partialJobs;
@@ -152,6 +155,20 @@ public class TestHsWebServicesJobsQuery
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+ JobState jobState) {
+ return CachedHistoryStorage.getPartialJobs(this.partialJobs.values(),
+ offset, count, user, queue, sBegin, sEnd, fBegin, fEnd, jobState);
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
@@ -167,6 +184,7 @@ public class TestHsWebServicesJobsQuery
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
+ bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java?rev=1311896&r1=1311895&r2=1311896&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java Tue Apr 10 18:11:26 2012
@@ -34,12 +34,15 @@ import javax.xml.parsers.DocumentBuilder
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MockJobs;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryContext;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
@@ -85,7 +88,7 @@ public class TestHsWebServicesTasks exte
private static TestAppContext appContext;
private static HsWebApp webApp;
- static class TestAppContext implements AppContext {
+ static class TestAppContext implements HistoryContext {
final ApplicationAttemptId appAttemptID;
final ApplicationId appID;
final String user = MockJobs.newUserName();
@@ -152,6 +155,20 @@ public class TestHsWebServicesTasks exte
public ClusterInfo getClusterInfo() {
return null;
}
+
+ @Override
+ public Map<JobId, Job> getAllJobs(ApplicationId appID) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+ JobState jobState) {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
private Injector injector = Guice.createInjector(new ServletModule() {
@@ -167,6 +184,7 @@ public class TestHsWebServicesTasks exte
bind(GenericExceptionHandler.class);
bind(WebApp.class).toInstance(webApp);
bind(AppContext.class).toInstance(appContext);
+ bind(HistoryContext.class).toInstance(appContext);
bind(Configuration.class).toInstance(conf);
serve("/*").with(GuiceContainer.class);