You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2012/04/11 07:47:51 UTC
svn commit: r1324567 [1/2] - in
/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
hadoop-mapreduce-client/hadoop-mapreduce-clie...
Author: todd
Date: Wed Apr 11 05:47:40 2012
New Revision: 1324567
URL: http://svn.apache.org/viewvc?rev=1324567&view=rev
Log:
Merge trunk into auto-failover branch.
Needs a few tweaks to fix compilation - will do in followup commit. This is just a straight merge
Added:
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
- copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJob.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
- copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
- copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
- copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/util/Hello.java
- copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/util/Hello.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolverScriptBasedMapping.java
- copied unchanged from r1324566, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolverScriptBasedMapping.java
Removed:
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/testjar/Hello.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestTTResourceReporting.java
Modified:
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServices.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesAttempts.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobConf.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesTasks.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/util/TestRunJar.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/bin/yarn
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/RackResolver.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/c++/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/block_forensics/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/build-contrib.xml (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/build.xml (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/data_join/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/eclipse-plugin/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/index/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/datanode/RaidBlockSender.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRaidUtil.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/contrib/vaidya/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/examples/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/java/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/ (props changed)
hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/src/webapps/job/ (props changed)
Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1310902-1324566
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt Wed Apr 11 05:47:40 2012
@@ -218,6 +218,21 @@ Release 2.0.0 - UNRELEASED
org.apache.hadoop.mapred.TestMiniMRWithDFSWithDistinctUsers (Devaraj K via
bobby)
+ MAPREDUCE-4105. Yarn RackResolver ignores rack configurations.
+ (Ahmed Radwan via tomwhite)
+
+ MAPREDUCE-3869. Fix classpath for DistributedShell application. (Devaraj K
+ via sseth)
+
+ MAPREDUCE-4057. Update RAID for the HA and fsdataset changes. (Devaraj K
+ via szetszwo)
+
+ MAPREDUCE-4076. Stream job fails with ZipException when use yarn jar
+ command (Devaraj K via bobby)
+
+ MAPREDUCE-4108. Fix tests in org.apache.hadoop.util.TestRunJar
+ (Devaraj K via tgraves)
+
Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -229,6 +244,9 @@ Release 0.23.3 - UNRELEASED
IMPROVEMENTS
+ MAPREDUCE-4059. The history server should have a separate pluggable
+ storage/query interface. (Robert Evans via tgraves)
+
OPTIMIZATIONS
BUG FIXES
@@ -280,6 +298,18 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4051. Remove the empty hadoop-mapreduce-project/assembly/all.xml
file (Ravi Prakash via bobby)
+ MAPREDUCE-4117. mapred job -status throws NullPointerException (Devaraj K
+ via bobby)
+
+ MAPREDUCE-4099. ApplicationMaster may fail to remove staging directory
+ (Jason Lowe via bobby)
+
+ MAPREDUCE-4017. Add jobname to jobsummary log (tgraves and Koji Noguchi
+ via bobby)
+
+ MAPREDUCE-4040. History links should use hostname rather than IP address.
+ (Bhallamudi Venkata Siva Kamesh via sseth)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1310902-1324566
Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1310902-1324566
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Wed Apr 11 05:47:40 2012
@@ -551,6 +551,7 @@ public class JobHistoryEventHandler exte
summary.setUser(jse.getUserName());
summary.setQueue(jse.getJobQueueName());
summary.setJobSubmitTime(jse.getSubmitTime());
+ summary.setJobName(jse.getJobName());
break;
case NORMALIZED_RESOURCE:
NormalizedResourceEvent normalizedResourceEvent =
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSummary.java Wed Apr 11 05:47:40 2012
@@ -45,6 +45,7 @@ public class JobSummary {
private long mapSlotSeconds; // TODO Not generated yet in MRV2
private long reduceSlotSeconds; // TODO Not generated yet MRV2
// private int clusterSlotCapacity;
+ private String jobName;
JobSummary() {
}
@@ -185,6 +186,14 @@ public class JobSummary {
this.reduceSlotSeconds = reduceSlotSeconds;
}
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
public String getJobSummaryString() {
SummaryBuilder summary = new SummaryBuilder()
.add("jobId", jobId)
@@ -201,7 +210,8 @@ public class JobSummary {
.add("queue", queue)
.add("status", jobStatus)
.add("mapSlotSeconds", mapSlotSeconds)
- .add("reduceSlotSeconds", reduceSlotSeconds);
+ .add("reduceSlotSeconds", reduceSlotSeconds)
+ .add("jobName", jobName);
return summary.toString();
}
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Apr 11 05:47:40 2012
@@ -405,6 +405,14 @@ public class MRAppMaster extends Composi
} catch (InterruptedException e) {
e.printStackTrace();
}
+
+ // Cleanup staging directory
+ try {
+ cleanupStagingDir();
+ } catch(IOException io) {
+ LOG.warn("Failed to delete staging dir", io);
+ }
+
try {
// Stop all services
// This will also send the final report to the ResourceManager
@@ -415,13 +423,6 @@ public class MRAppMaster extends Composi
LOG.warn("Graceful stop failed ", t);
}
- // Cleanup staging directory
- try {
- cleanupStagingDir();
- } catch(IOException io) {
- LOG.warn("Failed to delete staging dir");
- }
-
//Bring the process down by force.
//Not needed after HADOOP-7140
LOG.info("Exiting MR AppMaster..GoodBye!");
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Wed Apr 11 05:47:40 2012
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy;
+import java.io.IOException;
import java.util.Iterator;
import junit.framework.Assert;
@@ -35,11 +36,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
import org.junit.Test;
/**
@@ -233,6 +237,71 @@ public class TestMRApp {
}
}
+ private final class MRAppTestCleanup extends MRApp {
+ boolean hasStopped;
+ boolean cleanedBeforeStopped;
+
+ public MRAppTestCleanup(int maps, int reduces, boolean autoComplete,
+ String testName, boolean cleanOnStart) {
+ super(maps, reduces, autoComplete, testName, cleanOnStart);
+ hasStopped = false;
+ cleanedBeforeStopped = false;
+ }
+
+ @Override
+ protected Job createJob(Configuration conf) {
+ UserGroupInformation currentUser = null;
+ try {
+ currentUser = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ Job newJob = new TestJob(getJobId(), getAttemptID(), conf,
+ getDispatcher().getEventHandler(),
+ getTaskAttemptListener(), getContext().getClock(),
+ getCommitter(), isNewApiCommitter(),
+ currentUser.getUserName(), getContext());
+ ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+ getDispatcher().register(JobFinishEvent.Type.class,
+ createJobFinishEventHandler());
+
+ return newJob;
+ }
+
+ @Override
+ public void cleanupStagingDir() throws IOException {
+ cleanedBeforeStopped = !hasStopped;
+ }
+
+ @Override
+ public synchronized void stop() {
+ hasStopped = true;
+ super.stop();
+ }
+
+ @Override
+ protected void sysexit() {
+ }
+ }
+
+ @Test
+ public void testStagingCleanupOrder() throws Exception {
+ MRAppTestCleanup app = new MRAppTestCleanup(1, 1, true,
+ this.getClass().getName(), true);
+ JobImpl job = (JobImpl)app.submit(new Configuration());
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+
+ int waitTime = 20 * 1000;
+ while (waitTime > 0 && !app.cleanedBeforeStopped) {
+ Thread.sleep(100);
+ waitTime -= 100;
+ }
+ Assert.assertTrue("Staging directory not cleaned before notifying RM",
+ app.cleanedBeforeStopped);
+ }
+
public static void main(String[] args) throws Exception {
TestMRApp t = new TestMRApp();
t.testMapReduce();
@@ -241,5 +310,6 @@ public class TestMRApp {
t.testCompletedMapsForReduceSlowstart();
t.testJobError();
t.testCountersOnJobFinish();
+ t.testStagingCleanupOrder();
}
}
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Wed Apr 11 05:47:40 2012
@@ -44,6 +44,9 @@ public class JHAdminConfig {
/** Run the History Cleaner every X ms.*/
public static final String MR_HISTORY_CLEANER_INTERVAL_MS =
MR_HISTORY_PREFIX + "cleaner.interval-ms";
+ public static final long DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS =
+ 1 * 24 * 60 * 60 * 1000l; //1 day
+
/** The number of threads to handle client API requests.*/
public static final String MR_HISTORY_CLIENT_THREAD_COUNT =
@@ -56,7 +59,9 @@ public class JHAdminConfig {
*/
public static final String MR_HISTORY_DATESTRING_CACHE_SIZE =
MR_HISTORY_PREFIX + "datestring.cache.size";
+ public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000;
+ //TODO REMOVE debug-mode
/** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */
public static final String MR_HISTORY_DEBUG_MODE =
MR_HISTORY_PREFIX + "debug-mode";
@@ -75,6 +80,7 @@ public class JHAdminConfig {
/** Size of the job list cache.*/
public static final String MR_HISTORY_JOBLIST_CACHE_SIZE =
MR_HISTORY_PREFIX + "joblist.cache.size";
+ public static final int DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE = 20000;
/** The location of the Kerberos keytab file.*/
public static final String MR_HISTORY_KEYTAB = MR_HISTORY_PREFIX + "keytab";
@@ -82,6 +88,7 @@ public class JHAdminConfig {
/** Size of the loaded job cache.*/
public static final String MR_HISTORY_LOADED_JOB_CACHE_SIZE =
MR_HISTORY_PREFIX + "loadedjobs.cache.size";
+ public static final int DEFAULT_MR_HISTORY_LOADED_JOB_CACHE_SIZE = 5;
/**
* The maximum age of a job history file before it is deleted from the history
@@ -89,6 +96,8 @@ public class JHAdminConfig {
*/
public static final String MR_HISTORY_MAX_AGE_MS =
MR_HISTORY_PREFIX + "max-age-ms";
+ public static final long DEFAULT_MR_HISTORY_MAX_AGE =
+ 7 * 24 * 60 * 60 * 1000L; //1 week
/**
* Scan for history files to more from intermediate done dir to done dir
@@ -96,10 +105,13 @@ public class JHAdminConfig {
*/
public static final String MR_HISTORY_MOVE_INTERVAL_MS =
MR_HISTORY_PREFIX + "move.interval-ms";
+ public static final long DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS =
+ 3 * 60 * 1000l; //3 minutes
/** The number of threads used to move files.*/
public static final String MR_HISTORY_MOVE_THREAD_COUNT =
MR_HISTORY_PREFIX + "move.thread-count";
+ public static final int DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT = 3;
/** The Kerberos principal for the history server.*/
public static final String MR_HISTORY_PRINCIPAL =
@@ -116,4 +128,10 @@ public class JHAdminConfig {
*/
public static final String MR_HS_SECURITY_SERVICE_AUTHORIZATION =
"security.mrhs.client.protocol.acl";
+
+ /**
+ * The HistoryStorage class to use to cache history data.
+ */
+ public static final String MR_HISTORY_STORAGE =
+ MR_HISTORY_PREFIX + ".store.class";
}
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Wed Apr 11 05:47:40 2012
@@ -31,6 +31,8 @@ import java.util.concurrent.atomic.Atomi
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
@@ -50,6 +52,8 @@ import org.apache.hadoop.yarn.api.record
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
public class JobHistoryUtils {
/**
@@ -503,7 +507,7 @@ public class JobHistoryUtils {
StringBuffer sb = new StringBuffer();
if (address.getAddress().isAnyLocalAddress() ||
address.getAddress().isLoopbackAddress()) {
- sb.append(InetAddress.getLocalHost().getHostAddress());
+ sb.append(InetAddress.getLocalHost().getCanonicalHostName());
} else {
sb.append(address.getHostName());
}
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Wed Apr 11 05:47:40 2012
@@ -509,6 +509,11 @@ public class Job extends JobContextImpl
lastEvent = event;
}
}
+ if (lastEvent == null) {
+ return "There are no failed tasks for the job. "
+ + "Job is failed due to some other reason and reason "
+ + "can be found in the logs.";
+ }
String[] taskAttemptID = lastEvent.getTaskAttemptId().toString().split("_", 2);
String taskID = taskAttemptID[1].substring(0, taskAttemptID[1].length()-2);
return (" task " + taskID + " failed " +
Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1310902-1324566
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryContext.java Wed Apr 11 05:47:40 2012
@@ -24,8 +24,13 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
public interface HistoryContext extends AppContext {
Map<JobId, Job> getAllJobs(ApplicationId appID);
+
+ JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState);
}
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Wed Apr 11 05:47:40 2012
@@ -1,36 +1,26 @@
/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.mapreduce.v2.hs;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -41,26 +31,16 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.jobhistory.JobSummary;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
-import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.YarnException;
@@ -69,106 +49,36 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/*
+/**
* Loads and manages the Job history cache.
*/
-public class JobHistory extends AbstractService implements HistoryContext {
-
- private static final int DEFAULT_JOBLIST_CACHE_SIZE = 20000;
- private static final int DEFAULT_LOADEDJOB_CACHE_SIZE = 5;
- private static final int DEFAULT_DATESTRING_CACHE_SIZE = 200000;
- private static final long DEFAULT_MOVE_THREAD_INTERVAL = 3 * 60 * 1000l; //3 minutes
- private static final int DEFAULT_MOVE_THREAD_COUNT = 3;
-
- static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week
- static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day
-
+public class JobHistory extends AbstractService implements HistoryContext {
private static final Log LOG = LogFactory.getLog(JobHistory.class);
- private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class);
- public static final Pattern CONF_FILENAME_REGEX =
- Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
+ public static final Pattern CONF_FILENAME_REGEX = Pattern.compile("("
+ + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?");
public static final String OLD_SUFFIX = ".old";
- private static String DONE_BEFORE_SERIAL_TAIL =
- JobHistoryUtils.doneSubdirsBeforeSerialTail();
-
- /**
- * Maps between a serial number (generated based on jobId) and the timestamp
- * component(s) to which it belongs.
- * Facilitates jobId based searches.
- * If a jobId is not found in this list - it will not be found.
- */
- private final SortedMap<String, Set<String>> idToDateString =
- new ConcurrentSkipListMap<String, Set<String>>();
-
- //Maintains minimal details for recent jobs (parsed from history file name).
- //Sorted on Job Completion Time.
- private final SortedMap<JobId, MetaInfo> jobListCache =
- new ConcurrentSkipListMap<JobId, MetaInfo>();
-
-
- // Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo)
- // Check for existance of the object when using iterators.
- private final SortedMap<JobId, MetaInfo> intermediateListCache =
- new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>();
-
- //Maintains a list of known done subdirectories. Not currently used.
- private final Set<Path> existingDoneSubdirs = new HashSet<Path>();
-
- private Map<JobId, Job> loadedJobCache = null;
-
- /**
- * Maintains a mapping between intermediate user directories and the last
- * known modification time.
- */
- private Map<String, Long> userDirModificationTimeMap =
- new HashMap<String, Long>();
-
- //The number of jobs to maintain in the job list cache.
- private int jobListCacheSize;
-
- private JobACLsManager aclsMgr;
-
- //The number of loaded jobs.
- private int loadedJobCacheSize;
-
- //The number of entries in idToDateString
- private int dateStringCacheSize;
-
- //Time interval for the move thread.
+ // Time interval for the move thread.
private long moveThreadInterval;
-
- //Number of move threads.
+
+ // Number of move threads.
private int numMoveThreads;
-
- private Configuration conf;
- private boolean debugMode;
- private int serialNumberLowDigits;
- private String serialNumberFormat;
-
-
- private Path doneDirPrefixPath = null; // folder for completed jobs
- private FileContext doneDirFc; // done Dir FileContext
-
- private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path
- private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext
+ private Configuration conf;
private Thread moveIntermediateToDoneThread = null;
private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null;
+
private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null;
-
- /**
- * Writes out files to the path
- * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist
- */
- @SuppressWarnings("serial")
+ private HistoryStorage storage = null;
+ private HistoryFileManager hsManager = null;
+
@Override
public void init(Configuration conf) throws YarnException {
LOG.info("JobHistory Init");
@@ -176,121 +86,66 @@ public class JobHistory extends Abstract
this.appID = RecordFactoryProvider.getRecordFactory(conf)
.newRecordInstance(ApplicationId.class);
this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf)
- .newRecordInstance(ApplicationAttemptId.class);
-
- debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false);
- serialNumberLowDigits = debugMode ? 1 : 3;
- serialNumberFormat = ("%0"
- + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS
- + serialNumberLowDigits) + "d");
+ .newRecordInstance(ApplicationAttemptId.class);
- String doneDirPrefix = null;
- doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
- try {
- doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified(
- new Path(doneDirPrefix));
- doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf);
- mkdir(doneDirFc, doneDirPrefixPath, new FsPermission(
- JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION));
- } catch (IOException e) {
- throw new YarnException("Error creating done directory: [" +
- doneDirPrefixPath + "]", e);
- }
-
- String intermediateDoneDirPrefix = null;
- intermediateDoneDirPrefix = JobHistoryUtils
- .getConfiguredHistoryIntermediateDoneDirPrefix(conf);
- try {
- intermediateDoneDirPath = FileContext.getFileContext(conf)
- .makeQualified(new Path(intermediateDoneDirPrefix));
- intermediateDoneDirFc = FileContext.getFileContext(
- intermediateDoneDirPath.toUri(), conf);
- mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission(
- JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort()));
- } catch (IOException e) {
- LOG.info("error creating done directory on dfs " + e);
- throw new YarnException("Error creating intermediate done directory: ["
- + intermediateDoneDirPath + "]", e);
- }
-
- this.aclsMgr = new JobACLsManager(conf);
-
- jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE,
- DEFAULT_JOBLIST_CACHE_SIZE);
- loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE,
- DEFAULT_LOADEDJOB_CACHE_SIZE);
- dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE,
- DEFAULT_DATESTRING_CACHE_SIZE);
- moveThreadInterval =
- conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
- DEFAULT_MOVE_THREAD_INTERVAL);
+ moveThreadInterval = conf.getLong(
+ JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS);
numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
- DEFAULT_MOVE_THREAD_COUNT);
-
- loadedJobCache =
- Collections.synchronizedMap(new LinkedHashMap<JobId, Job>(
- loadedJobCacheSize + 1, 0.75f, true) {
- @Override
- public boolean removeEldestEntry(final Map.Entry<JobId, Job> eldest) {
- return super.size() > loadedJobCacheSize;
- }
- });
-
+ JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+
+ hsManager = new HistoryFileManager();
+ hsManager.init(conf);
try {
- initExisting();
+ hsManager.initExisting();
} catch (IOException e) {
throw new YarnException("Failed to intialize existing directories", e);
}
- super.init(conf);
- }
-
- private void mkdir(FileContext fc, Path path, FsPermission fsp)
- throws IOException {
- if (!fc.util().exists(path)) {
- try {
- fc.mkdir(path, fsp, true);
- FileStatus fsStatus = fc.getFileStatus(path);
- LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
- + ", Expected: " + fsp.toShort());
- if (fsStatus.getPermission().toShort() != fsp.toShort()) {
- LOG.info("Explicitly setting permissions to : " + fsp.toShort()
- + ", " + fsp);
- fc.setPermission(path, fsp);
- }
- } catch (FileAlreadyExistsException e) {
- LOG.info("Directory: [" + path + "] already exists.");
- }
+ storage = ReflectionUtils.newInstance(conf.getClass(
+ JHAdminConfig.MR_HISTORY_STORAGE, CachedHistoryStorage.class,
+ HistoryStorage.class), conf);
+ if (storage instanceof Service) {
+ ((Service) storage).init(conf);
}
+ storage.setHistoryFileManager(hsManager);
+
+ super.init(conf);
}
@Override
public void start() {
- //Start moveIntermediatToDoneThread
- moveIntermediateToDoneRunnable =
- new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads);
+ hsManager.start();
+ if (storage instanceof Service) {
+ ((Service) storage).start();
+ }
+
+ // Start moveIntermediatToDoneThread
+ moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable(
+ moveThreadInterval, numMoveThreads);
moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable);
moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner");
moveIntermediateToDoneThread.start();
-
- //Start historyCleaner
+
+ // Start historyCleaner
boolean startCleanerService = conf.getBoolean(
JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true);
if (startCleanerService) {
long maxAgeOfHistoryFiles = conf.getLong(
- JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE);
+ JHAdminConfig.MR_HISTORY_MAX_AGE_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE);
cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder().setNameFormat("LogCleaner").build()
- );
+ new ThreadFactoryBuilder().setNameFormat("LogCleaner").build());
long runInterval = conf.getLong(
- JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL);
+ JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS,
+ JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS);
cleanerScheduledExecutor
.scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles),
30 * 1000l, runInterval, TimeUnit.MILLISECONDS);
}
super.start();
}
-
+
@Override
public void stop() {
LOG.info("Stopping JobHistory");
@@ -323,281 +178,16 @@ public class JobHistory extends Abstract
LOG.warn("HistoryCleanerService shutdown may not have succeeded");
}
}
+ if (storage instanceof Service) {
+ ((Service) storage).stop();
+ }
+ hsManager.stop();
super.stop();
}
-
+
public JobHistory() {
super(JobHistory.class.getName());
}
-
- /**
- * Populates index data structures.
- * Should only be called at initialization times.
- */
- @SuppressWarnings("unchecked")
- private void initExisting() throws IOException {
- LOG.info("Initializing Existing Jobs...");
- List<FileStatus> timestampedDirList = findTimestampedDirectories();
- Collections.sort(timestampedDirList);
- for (FileStatus fs : timestampedDirList) {
- //TODO Could verify the correct format for these directories.
- addDirectoryToSerialNumberIndex(fs.getPath());
- addDirectoryToJobListCache(fs.getPath());
- }
- }
-
- private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) {
- String serialPart = serialDirPath.getName();
- String timeStampPart =
- JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
- if (timeStampPart == null) {
- LOG.warn("Could not find timestamp portion from path: " +
- serialDirPath.toString() +". Continuing with next");
- return;
- }
- if (serialPart == null) {
- LOG.warn("Could not find serial portion from path: " +
- serialDirPath.toString() + ". Continuing with next");
- return;
- }
- if (idToDateString.containsKey(serialPart)) {
- Set<String> set = idToDateString.get(serialPart);
- set.remove(timeStampPart);
- if (set.isEmpty()) {
- idToDateString.remove(serialPart);
- }
- }
-
- }
-
- private void addDirectoryToSerialNumberIndex(Path serialDirPath) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+serialDirPath+" to serial index");
- }
- String serialPart = serialDirPath.getName();
- String timestampPart =
- JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString());
- if (timestampPart == null) {
- LOG.warn("Could not find timestamp portion from path: " +
- serialDirPath.toString() +". Continuing with next");
- return;
- }
- if (serialPart == null) {
- LOG.warn("Could not find serial portion from path: " +
- serialDirPath.toString() + ". Continuing with next");
- }
- addToSerialNumberIndex(serialPart, timestampPart);
- }
-
- private void addToSerialNumberIndex(String serialPart, String timestampPart) {
- if (!idToDateString.containsKey(serialPart)) {
- idToDateString.put(serialPart, new HashSet<String>());
- if (idToDateString.size() > dateStringCacheSize) {
- idToDateString.remove(idToDateString.firstKey());
- }
- Set<String> datePartSet = idToDateString.get(serialPart);
- datePartSet.add(timestampPart);
- }
- }
-
- private void addDirectoryToJobListCache(Path path) throws IOException {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+path+" to job list cache.");
- }
- List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path,
- doneDirFc);
- for (FileStatus fs : historyFileList) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding in history for "+fs.getPath());
- }
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
- .getName());
- String confFileName = JobHistoryUtils
- .getIntermediateConfFileName(jobIndexInfo.getJobId());
- String summaryFileName = JobHistoryUtils
- .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- addToJobListCache(jobIndexInfo.getJobId(), metaInfo);
- }
- }
-
- private static List<FileStatus> scanDirectory(Path path, FileContext fc,
- PathFilter pathFilter) throws IOException {
- path = fc.makeQualified(path);
- List<FileStatus> jhStatusList = new ArrayList<FileStatus>();
- RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path);
- while (fileStatusIter.hasNext()) {
- FileStatus fileStatus = fileStatusIter.next();
- Path filePath = fileStatus.getPath();
- if (fileStatus.isFile() && pathFilter.accept(filePath)) {
- jhStatusList.add(fileStatus);
- }
- }
- return jhStatusList;
- }
-
- private static List<FileStatus> scanDirectoryForHistoryFiles(Path path,
- FileContext fc) throws IOException {
- return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter());
- }
-
- /**
- * Finds all history directories with a timestamp component by scanning
- * the filesystem.
- * Used when the JobHistory server is started.
- * @return
- */
- private List<FileStatus> findTimestampedDirectories() throws IOException {
- List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc,
- doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL);
- return fsList;
- }
-
- /**
- * Adds an entry to the job list cache. Maintains the size.
- */
- private void addToJobListCache(JobId jobId, MetaInfo metaInfo) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+jobId+" to job list cache with "
- +metaInfo.getJobIndexInfo());
- }
- jobListCache.put(jobId, metaInfo);
- if (jobListCache.size() > jobListCacheSize) {
- jobListCache.remove(jobListCache.firstKey());
- }
- }
-
- /**
- * Adds an entry to the loaded job cache. Maintains the size.
- */
- private void addToLoadedJobCache(Job job) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Adding "+job.getID()+" to loaded job cache");
- }
- loadedJobCache.put(job.getID(), job);
- }
-
-
- /**
- * Scans the intermediate directory to find user directories. Scans these
- * for history files if the modification time for the directory has changed.
- * @throws IOException
- */
- private void scanIntermediateDirectory() throws IOException {
- List<FileStatus> userDirList =
- JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, "");
-
- for (FileStatus userDir : userDirList) {
- String name = userDir.getPath().getName();
- long newModificationTime = userDir.getModificationTime();
- boolean shouldScan = false;
- synchronized (userDirModificationTimeMap) {
- if (!userDirModificationTimeMap.containsKey(name) || newModificationTime
- > userDirModificationTimeMap.get(name)) {
- shouldScan = true;
- userDirModificationTimeMap.put(name, newModificationTime);
- }
- }
- if (shouldScan) {
- scanIntermediateDirectory(userDir.getPath());
- }
- }
- }
-
- /**
- * Scans the specified path and populates the intermediate cache.
- * @param absPath
- * @throws IOException
- */
- private void scanIntermediateDirectory(final Path absPath)
- throws IOException {
- List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath,
- intermediateDoneDirFc);
- for (FileStatus fs : fileStatusList) {
- JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath()
- .getName());
- String confFileName = JobHistoryUtils
- .getIntermediateConfFileName(jobIndexInfo.getJobId());
- String summaryFileName = JobHistoryUtils
- .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) {
- intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo);
- }
- }
- }
-
- /**
- * Searches the job history file FileStatus list for the specified JobId.
- *
- * @param fileStatusList fileStatus list of Job History Files.
- * @param jobId The JobId to find.
- * @param checkForDoneFile whether to check for the existance of a done file.
- * @return A MetaInfo object for the jobId, null if not found.
- * @throws IOException
- */
- private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId)
- throws IOException {
- for (FileStatus fs : fileStatusList) {
- JobIndexInfo jobIndexInfo =
- FileNameIndexUtils.getIndexInfo(fs.getPath().getName());
- if (jobIndexInfo.getJobId().equals(jobId)) {
- String confFileName = JobHistoryUtils
- .getIntermediateConfFileName(jobIndexInfo.getJobId());
- String summaryFileName = JobHistoryUtils
- .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath()
- .getParent(), confFileName), new Path(fs.getPath().getParent(),
- summaryFileName), jobIndexInfo);
- return metaInfo;
- }
- }
- return null;
- }
-
- /**
- * Scans old directories known by the idToDateString map for the specified
- * jobId.
- * If the number of directories is higher than the supported size of the
- * idToDateString cache, the jobId will not be found.
- * @param jobId the jobId.
- * @return
- * @throws IOException
- */
- private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException {
- int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId);
- String boxedSerialNumber = String.valueOf(jobSerialNumber);
- Set<String> dateStringSet = idToDateString.get(boxedSerialNumber);
- if (dateStringSet == null) {
- return null;
- }
- for (String timestampPart : dateStringSet) {
- Path logDir = canonicalHistoryLogPath(jobId, timestampPart);
- List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir,
- doneDirFc);
- MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId);
- if (metaInfo != null) {
- return metaInfo;
- }
- }
- return null;
- }
-
- /**
- * Checks for the existence of the job history file in the intermediate
- * directory.
- * @param jobId
- * @return
- * @throws IOException
- */
- private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException {
- scanIntermediateDirectory();
- return intermediateListCache.get(jobId);
- }
@Override
public String getApplicationName() {
@@ -609,486 +199,167 @@ public class JobHistory extends Abstract
private long sleepTime;
private ThreadPoolExecutor moveToDoneExecutor = null;
private boolean running = false;
-
- public void stop() {
+
+ public synchronized void stop() {
running = false;
+ notify();
}
-
+
MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) {
this.sleepTime = sleepTime;
- ThreadFactory tf = new ThreadFactoryBuilder()
- .setNameFormat("MoveIntermediateToDone Thread #%d")
- .build();
- moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+ "MoveIntermediateToDone Thread #%d").build();
+ moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
running = true;
}
-
- @Override
+
+ @Override
public void run() {
Thread.currentThread().setName("IntermediateHistoryScanner");
try {
- while (running) {
+ while (true) {
LOG.info("Starting scan to move intermediate done files");
- scanIntermediateDirectory();
- for (final MetaInfo metaInfo : intermediateListCache.values()) {
+ for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) {
moveToDoneExecutor.execute(new Runnable() {
@Override
public void run() {
try {
- moveToDone(metaInfo);
+ hsManager.moveToDone(metaInfo);
} catch (IOException e) {
- LOG.info("Failed to process metaInfo for job: " +
- metaInfo.jobIndexInfo.getJobId(), e);
+ LOG.info(
+ "Failed to process metaInfo for job: "
+ + metaInfo.getJobId(), e);
}
}
});
-
}
- synchronized (this) { // TODO Is this really required.
+ synchronized (this) {
try {
this.wait(sleepTime);
} catch (InterruptedException e) {
LOG.info("IntermediateHistoryScannerThread interrupted");
}
+ if (!running) {
+ break;
+ }
}
}
} catch (IOException e) {
- LOG.warn("Unable to get a list of intermediate files to be moved from: "
- + intermediateDoneDirPath);
+ LOG.warn("Unable to get a list of intermediate files to be moved");
+ // TODO Shut down the entire process!!!!
}
}
}
-
- private Job loadJob(MetaInfo metaInfo) {
- synchronized(metaInfo) {
- try {
- Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(),
- metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(),
- metaInfo.getConfFile(), this.aclsMgr);
- addToLoadedJobCache(job);
- return job;
- } catch (IOException e) {
- throw new YarnException("Could not find/load job: " +
- metaInfo.getJobIndexInfo().getJobId(), e);
- }
- }
- }
-
- private Map<JobId, Job> getAllJobsInternal() {
- //TODO This should ideally be using getAllJobsMetaInfo
- // or get rid of that method once Job has APIs for user, finishTime etc.
- SortedMap<JobId, Job> result = new TreeMap<JobId, Job>();
- try {
- scanIntermediateDirectory();
- } catch (IOException e) {
- LOG.warn("Failed to scan intermediate directory", e);
- throw new YarnException(e);
- }
- for (JobId jobId : intermediateListCache.keySet()) {
- MetaInfo mi = intermediateListCache.get(jobId);
- if (mi != null) {
- result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
- .getJobIndexInfo().getJobId()));
- }
- }
- for (JobId jobId : jobListCache.keySet()) {
- MetaInfo mi = jobListCache.get(jobId);
- if (mi != null) {
- result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi
- .getJobIndexInfo().getJobId()));
- }
- }
- return result;
- }
/**
* Helper method for test cases.
*/
MetaInfo getJobMetaInfo(JobId jobId) throws IOException {
- //MetaInfo available in cache.
- MetaInfo metaInfo = null;
- if (jobListCache.containsKey(jobId)) {
- metaInfo = jobListCache.get(jobId);
- }
-
- if (metaInfo != null) {
- return metaInfo;
- }
-
- //MetaInfo not available. Check intermediate directory for meta info.
- metaInfo = scanIntermediateForJob(jobId);
- if (metaInfo != null) {
- return metaInfo;
- }
-
- //Intermediate directory does not contain job. Search through older ones.
- metaInfo = scanOldDirsForJob(jobId);
- if (metaInfo != null) {
- return metaInfo;
- }
- return null;
- }
-
- private Job findJob(JobId jobId) throws IOException {
- //Job already loaded.
- if (loadedJobCache.containsKey(jobId)) {
- return loadedJobCache.get(jobId);
- }
-
- //MetaInfo available in cache.
- MetaInfo metaInfo = null;
- if (jobListCache.containsKey(jobId)) {
- metaInfo = jobListCache.get(jobId);
- }
-
- if (metaInfo != null) {
- return loadJob(metaInfo);
- }
-
- //MetaInfo not available. Check intermediate directory for meta info.
- metaInfo = scanIntermediateForJob(jobId);
- if (metaInfo != null) {
- return loadJob(metaInfo);
- }
-
- //Intermediate directory does not contain job. Search through older ones.
- metaInfo = scanOldDirsForJob(jobId);
- if (metaInfo != null) {
- return loadJob(metaInfo);
- }
- return null;
- }
-
- private void moveToDone(MetaInfo metaInfo) throws IOException {
- long completeTime = metaInfo.getJobIndexInfo().getFinishTime();
- if (completeTime == 0) completeTime = System.currentTimeMillis();
- JobId jobId = metaInfo.getJobIndexInfo().getJobId();
-
- List<Path> paths = new ArrayList<Path>();
- Path historyFile = metaInfo.getHistoryFile();
- if (historyFile == null) {
- LOG.info("No file for job-history with " + jobId + " found in cache!");
- } else {
- paths.add(historyFile);
- }
-
- Path confFile = metaInfo.getConfFile();
- if (confFile == null) {
- LOG.info("No file for jobConf with " + jobId + " found in cache!");
- } else {
- paths.add(confFile);
- }
-
- //TODO Check all mi getters and setters for the conf path
- Path summaryFile = metaInfo.getSummaryFile();
- if (summaryFile == null) {
- LOG.info("No summary file for job: " + jobId);
- } else {
- try {
- String jobSummaryString = getJobSummary(intermediateDoneDirFc, summaryFile);
- SUMMARY_LOG.info(jobSummaryString);
- LOG.info("Deleting JobSummary file: [" + summaryFile + "]");
- intermediateDoneDirFc.delete(summaryFile, false);
- metaInfo.setSummaryFile(null);
- } catch (IOException e) {
- LOG.warn("Failed to process summary file: [" + summaryFile + "]");
- throw e;
- }
- }
-
- Path targetDir = canonicalHistoryLogPath(jobId, completeTime);
- addDirectoryToSerialNumberIndex(targetDir);
- try {
- maybeMakeSubdirectory(targetDir);
- } catch (IOException e) {
- LOG.warn("Failed creating subdirectory: " + targetDir +
- " while attempting to move files for jobId: " + jobId);
- throw e;
- }
- synchronized (metaInfo) {
- if (historyFile != null) {
- Path toPath = doneDirFc.makeQualified(new Path(targetDir,
- historyFile.getName()));
- try {
- moveToDoneNow(historyFile, toPath);
- } catch (IOException e) {
- LOG.warn("Failed to move file: " + historyFile + " for jobId: "
- + jobId);
- throw e;
- }
- metaInfo.setHistoryFile(toPath);
- }
- if (confFile != null) {
- Path toPath = doneDirFc.makeQualified(new Path(targetDir,
- confFile.getName()));
- try {
- moveToDoneNow(confFile, toPath);
- } catch (IOException e) {
- LOG.warn("Failed to move file: " + historyFile + " for jobId: "
- + jobId);
- throw e;
- }
- metaInfo.setConfFile(toPath);
- }
- }
- addToJobListCache(jobId, metaInfo);
- intermediateListCache.remove(jobId);
+ return hsManager.getMetaInfo(jobId);
}
-
- private void moveToDoneNow(final Path src, final Path target)
- throws IOException {
- LOG.info("Moving " + src.toString() + " to " + target.toString());
- intermediateDoneDirFc.rename(src, target, Options.Rename.NONE);
- // fc.util().copy(src, target);
- //fc.delete(src, false);
- //intermediateDoneDirFc.setPermission(target, new FsPermission(
- //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION));
- }
-
- String getJobSummary(FileContext fc, Path path) throws IOException {
- Path qPath = fc.makeQualified(path);
- FSDataInputStream in = fc.open(qPath);
- String jobSummaryString = in.readUTF();
- in.close();
- return jobSummaryString;
- }
-
- private void maybeMakeSubdirectory(Path path) throws IOException {
- boolean existsInExistingCache = false;
- synchronized(existingDoneSubdirs) {
- if (existingDoneSubdirs.contains(path)) existsInExistingCache = true;
- }
- try {
- doneDirFc.getFileStatus(path);
- if (!existsInExistingCache) {
- existingDoneSubdirs.add(path);
- if (debugMode) {
- LOG.info("JobHistory.maybeMakeSubdirectory -- We believed "
- + path + " already existed, but it didn't.");
- }
- }
- } catch (FileNotFoundException fnfE) {
- try {
- FsPermission fsp =
- new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION);
- doneDirFc.mkdir(path, fsp, true);
- FileStatus fsStatus = doneDirFc.getFileStatus(path);
- LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
- + ", Expected: " + fsp.toShort());
- if (fsStatus.getPermission().toShort() != fsp.toShort()) {
- LOG.info("Explicitly setting permissions to : " + fsp.toShort()
- + ", " + fsp);
- doneDirFc.setPermission(path, fsp);
- }
- synchronized(existingDoneSubdirs) {
- existingDoneSubdirs.add(path);
- }
- } catch (FileAlreadyExistsException faeE) { //Nothing to do.
- }
- }
- }
-
- private Path canonicalHistoryLogPath(JobId id, String timestampComponent) {
- return new Path(doneDirPrefixPath,
- JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
- }
-
- private Path canonicalHistoryLogPath(JobId id, long millisecondTime) {
- String timestampComponent =
- JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode);
- return new Path(doneDirPrefixPath,
- JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat));
- }
-
@Override
- public synchronized Job getJob(JobId jobId) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Looking for Job "+jobId);
- }
- Job job = null;
- try {
- job = findJob(jobId);
- //This could return a null job.
- } catch (IOException e) {
- throw new YarnException(e);
- }
- return job;
+ public Job getJob(JobId jobId) {
+ return storage.getFullJob(jobId);
}
@Override
public Map<JobId, Job> getAllJobs(ApplicationId appID) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Called getAllJobs(AppId): " + appID);
}
-// currently there is 1 to 1 mapping between app and job id
+ // currently there is 1 to 1 mapping between app and job id
org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID);
Map<JobId, Job> jobs = new HashMap<JobId, Job>();
JobId jobID = TypeConverter.toYarn(oldJobID);
jobs.put(jobID, getJob(jobID));
return jobs;
-// return getAllJobs();
}
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapreduce.v2.hs.HistoryContext#getAllJobs()
- *
- * Returns a recent list of jobs. This may not be the complete set.
- * If a previous jobId is known - it can be queries via the getJob(JobId)
- * method.
- * Size of this list is determined by the size of the job list cache.
- * This can be fixed when pagination is implemented - return the first set of
- * jobs via the cache, go to DFS only when an attempt is made to navigate
- * past the cached list.
- * This does involve a DFS oepration of scanning the intermediate directory.
- */
+
+ @Override
public Map<JobId, Job> getAllJobs() {
- LOG.debug("Called getAllJobs()");
- return getAllJobsInternal();
+ return storage.getAllPartialJobs();
}
- static class MetaInfo {
- private Path historyFile;
- private Path confFile;
- private Path summaryFile;
- JobIndexInfo jobIndexInfo;
-
- MetaInfo(Path historyFile, Path confFile, Path summaryFile,
- JobIndexInfo jobIndexInfo) {
- this.historyFile = historyFile;
- this.confFile = confFile;
- this.summaryFile = summaryFile;
- this.jobIndexInfo = jobIndexInfo;
- }
-
- Path getHistoryFile() { return historyFile; }
- Path getConfFile() { return confFile; }
- Path getSummaryFile() { return summaryFile; }
- JobIndexInfo getJobIndexInfo() { return jobIndexInfo; }
-
- void setHistoryFile(Path historyFile) { this.historyFile = historyFile; }
- void setConfFile(Path confFile) {this.confFile = confFile; }
- void setSummaryFile(Path summaryFile) { this.summaryFile = summaryFile; }
+ /**
+ * Look for a set of partial jobs.
+ *
+ * @param offset
+ * the offset into the list of jobs.
+ * @param count
+ * the maximum number of jobs to return.
+ * @param user
+ * only return jobs for the given user.
+ * @param queue
+ * only return jobs for in the given queue.
+ * @param sBegin
+ * only return Jobs that started on or after the given time.
+ * @param sEnd
+ * only return Jobs that started on or before the given time.
+ * @param fBegin
+ * only return Jobs that ended on or after the given time.
+ * @param fEnd
+ * only return Jobs that ended on or before the given time.
+ * @param jobState
+ * only return jobs that are in the give job state.
+ * @return The list of filtered jobs.
+ */
+ @Override
+ public JobsInfo getPartialJobs(Long offset, Long count, String user,
+ String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd,
+ JobState jobState) {
+ return storage.getPartialJobs(offset, count, user, queue, sBegin, sEnd,
+ fBegin, fEnd, jobState);
}
-
public class HistoryCleaner implements Runnable {
- private long currentTime;
-
long maxAgeMillis;
- long filesDeleted = 0;
- long dirsDeleted = 0;
-
+
public HistoryCleaner(long maxAge) {
this.maxAgeMillis = maxAge;
}
-
- @SuppressWarnings("unchecked")
+
public void run() {
LOG.info("History Cleaner started");
- currentTime = System.currentTimeMillis();
- boolean halted = false;
- //TODO Delete YYYY/MM/DD directories.
+ long cutoff = System.currentTimeMillis() - maxAgeMillis;
try {
- List<FileStatus> serialDirList = findTimestampedDirectories();
- //Sort in ascending order. Relies on YYYY/MM/DD/Serial
- Collections.sort(serialDirList);
- for (FileStatus serialDir : serialDirList) {
- List<FileStatus> historyFileList =
- scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc);
- for (FileStatus historyFile : historyFileList) {
- JobIndexInfo jobIndexInfo =
- FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName());
- long effectiveTimestamp =
- getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile);
- if (shouldDelete(effectiveTimestamp)) {
- String confFileName =
- JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId());
- MetaInfo metaInfo = new MetaInfo(historyFile.getPath(),
- new Path(historyFile.getPath().getParent(), confFileName),
- null, jobIndexInfo);
- delete(metaInfo);
- } else {
- halted = true;
- break;
- }
- }
- if (!halted) {
- deleteDir(serialDir.getPath());
- removeDirectoryFromSerialNumberIndex(serialDir.getPath());
- synchronized (existingDoneSubdirs) {
- existingDoneSubdirs.remove(serialDir.getPath());
- }
-
- } else {
- break; //Don't scan any more directories.
- }
- }
+ hsManager.clean(cutoff, storage);
} catch (IOException e) {
- LOG.warn("Error in History cleaner run", e);
+ LOG.warn("Error trying to clean up ", e);
}
LOG.info("History Cleaner complete");
- LOG.info("FilesDeleted: " + filesDeleted);
- LOG.info("Directories Deleted: " + dirsDeleted);
- }
-
- private boolean shouldDelete(long ts) {
- return ((ts + maxAgeMillis) <= currentTime);
- }
-
- private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) {
- if (finishTime == 0) {
- return fileStatus.getModificationTime();
- }
- return finishTime;
- }
-
- private void delete(MetaInfo metaInfo) throws IOException {
- deleteFile(metaInfo.getHistoryFile());
- deleteFile(metaInfo.getConfFile());
- jobListCache.remove(metaInfo.getJobIndexInfo().getJobId());
- loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId());
- }
-
- private void deleteFile(final Path path) throws IOException {
- doneDirFc.delete(doneDirFc.makeQualified(path), false);
- filesDeleted++;
- }
-
- private void deleteDir(Path path) throws IOException {
- doneDirFc.delete(doneDirFc.makeQualified(path), true);
- dirsDeleted++;
- }
}
-
-
-
- //TODO AppContext - Not Required
- private ApplicationAttemptId appAttemptID;
+ }
+
+ // TODO AppContext - Not Required
+ private ApplicationAttemptId appAttemptID;
+
@Override
public ApplicationAttemptId getApplicationAttemptId() {
- //TODO fixme - bogus appAttemptID for now
+ // TODO fixme - bogus appAttemptID for now
return appAttemptID;
- }
-
- //TODO AppContext - Not Required
+ }
+
+ // TODO AppContext - Not Required
private ApplicationId appID;
+
@Override
public ApplicationId getApplicationID() {
- //TODO fixme - bogus appID for now
+ // TODO fixme - bogus appID for now
return appID;
}
-
- //TODO AppContext - Not Required
+
+ // TODO AppContext - Not Required
@Override
public EventHandler getEventHandler() {
// TODO Auto-generated method stub
return null;
}
-
- //TODO AppContext - Not Required
+
+ // TODO AppContext - Not Required
private String userName;
+
@Override
public CharSequence getUser() {
if (userName != null) {
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Wed Apr 11 05:47:40 2012
@@ -51,6 +51,7 @@ public class PartialJob implements org.a
jobReport = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobReport.class);
jobReport.setStartTime(jobIndexInfo.getSubmitTime());
jobReport.setFinishTime(jobIndexInfo.getFinishTime());
+ jobReport.setJobState(getState());
}
@Override
Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java?rev=1324567&r1=1324566&r2=1324567&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebApp.java Wed Apr 11 05:47:40 2012
@@ -44,6 +44,7 @@ public class HsWebApp extends WebApp imp
bind(JAXBContextResolver.class);
bind(GenericExceptionHandler.class);
bind(AppContext.class).toInstance(history);
+ bind(HistoryContext.class).toInstance(history);
route("/", HsController.class);
route("/app", HsController.class);
route(pajoin("/job", JOB_ID), HsController.class, "job");