You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [16/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Added: hadoop/mapreduce/branches/MR-279/mr-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,161 @@
+<?xml version="1.0"?>
+<project>
+
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client</artifactId>
+ <name>hadoop-mapreduce-client</name>
+ <version>${yarn.version}</version>
+ <packaging>pom</packaging>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <test.logs>true</test.logs>
+ <test.timeout>600000</test.timeout>
+ <hadoop-common.version>0.22.0-SNAPSHOT</hadoop-common.version>
+ <hadoop-hdfs.version>0.22.0-SNAPSHOT</hadoop-hdfs.version>
+ <hadoop-mapred.version>0.22.0-SNAPSHOT</hadoop-mapred.version>
+ <yarn.version>1.0-SNAPSHOT</yarn.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.3.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop-common.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!--dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapred</artifactId>
+ <version>${hadoop-mapred.version}</version>
+ </dependency-->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common-test</artifactId>
+ <version>${hadoop-common.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop-hdfs.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.3.1</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <!-- pre 2.1 ignores project.build.sourceEncoding -->
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ <descriptors>
+ <descriptor>assembly/all.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.4.1</version>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <!-- requires 2.5+ to make system properties work -->
+ <!-- requires 2.7+ to avoid SUREFIRE-640 -->
+ <version>2.7.2</version>
+ <configuration>
+ <failIfNoTests>false</failIfNoTests>
+ <redirectTestOutputToFile>${test.logs}</redirectTestOutputToFile>
+ <forkedProcessTimeoutInSeconds>${test.timeout}</forkedProcessTimeoutInSeconds>
+ <environmentVariables>
+ <JAVA_HOME>${java.home}</JAVA_HOME>
+ </environmentVariables>
+ <systemPropertyVariables>
+ <build.dir>${project.build.directory}</build.dir>
+ <build.output.dir>${project.build.outputDirectory}</build.output.dir>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.3.1</version>
+ </plugin>
+ <plugin>
+ <groupId>com.atlassian.maven.plugins</groupId>
+ <artifactId>maven-clover2-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <licenseLocation>/home/y/conf/clover/clover.license</licenseLocation>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>instrument</goal>
+ <goal>aggregate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <modules>
+ <module>hadoop-mapreduce-client-core</module>
+ <module>hadoop-mapreduce-client-common</module>
+ <module>hadoop-mapreduce-client-shuffle</module>
+ <module>hadoop-mapreduce-client-app</module>
+ <module>hadoop-mapreduce-client-jobclient</module>
+ <module>hadoop-mapreduce-client-hs</module>
+ </modules>
+</project>
Added: hadoop/mapreduce/branches/MR-279/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,193 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.hadoop.mapreduce</groupId>
+ <artifactId>hadoop-mapreduce</artifactId>
+ <version>${hadoop-mapreduce.version}</version>
+ <packaging>pom</packaging>
+
+ <name>hadoop-mapreduce</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <test.logs>true</test.logs>
+ <test.timeout>600000</test.timeout>
+ <hadoop-common.version>0.22.0-SNAPSHOT</hadoop-common.version>
+ <hadoop-hdfs.version>0.22.0-SNAPSHOT</hadoop-hdfs.version>
+ <hadoop-mapreduce.version>1.0-SNAPSHOT</hadoop-mapreduce.version>
+ </properties>
+
+ <dependencies>
+ <!-- The Avro dependency must be defined before hadoop-common to override
+ the transitive dependency on avro in hadoop-common. -->
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.4.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.ant</groupId>
+ <artifactId>ant</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.velocity</groupId>
+ <artifactId>velocity</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>paranamer-ant</artifactId>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop-common.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common-test</artifactId>
+ <version>${hadoop-common.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop-hdfs.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-servlet</artifactId>
+ <version>2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.3.Final</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.3.1</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <!-- pre 2.1 ignores project.build.sourceEncoding -->
+ <version>2.3.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <tarLongFileMode>gnu</tarLongFileMode>
+ <descriptors>
+ <descriptor>assembly/all.xml</descriptor>
+ </descriptors>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.4.1</version>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <!-- requires 2.5+ to make system properties work -->
+ <!-- requires 2.7+ to avoid SUREFIRE-640 -->
+ <version>2.7.2</version>
+ <configuration>
+ <failIfNoTests>false</failIfNoTests>
+ <redirectTestOutputToFile>${test.logs}</redirectTestOutputToFile>
+ <forkedProcessTimeoutInSeconds>${test.timeout}</forkedProcessTimeoutInSeconds>
+ <environmentVariables>
+ <JAVA_HOME>${java.home}</JAVA_HOME>
+ </environmentVariables>
+ <systemPropertyVariables>
+ <build.dir>${project.build.directory}</build.dir>
+ <build.output.dir>${project.build.outputDirectory}</build.output.dir>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.3.1</version>
+ </plugin>
+ <plugin>
+ <groupId>com.atlassian.maven.plugins</groupId>
+ <artifactId>maven-clover2-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <licenseLocation>/home/y/conf/clover/clover.license</licenseLocation>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>instrument</goal>
+ <goal>aggregate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <modules>
+ <module>yarn</module>
+ <module>mr-client</module>
+ </modules>
+
+</project>
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java Thu Mar 17 20:21:13 2011
@@ -366,7 +366,7 @@ class DistributedCacheEmulator {
* @return true if the path provided is of a local file system based
* distributed cache file
*/
- static boolean isLocalDistCacheFile(String filePath, String user,
+ private boolean isLocalDistCacheFile(String filePath, String user,
boolean visibility) {
return (!visibility && filePath.contains(user + "/.staging"));
}
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java Thu Mar 17 20:21:13 2011
@@ -74,11 +74,10 @@ public class GridmixSystemTestCase {
@AfterClass
public static void after() throws Exception {
UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
- org.apache.hadoop.fs.FileUtil.fullyDelete(new java.io.File(System.
- getProperty("java.io.tmpdir") + "/gridmix-st/"));
+ org.apache.hadoop.fs.FileUtil.fullyDelete(new java.io.File("/tmp/gridmix-st"));
cluster.tearDown();
- if (gridmixJS != null && gridmixJS.getJobConf().
- get("gridmix.user.resolve.class"). contains("RoundRobin")) {
+ if (gridmixJS.getJobConf().get("gridmix.user.resolve.class").
+ contains("RoundRobin")) {
List<String> proxyUsers = UtilsForGridmix.
listProxyUsers(gridmixJS.getJobConf(),
UserGroupInformation.getLoginUser().getShortUserName());
@@ -98,19 +97,19 @@ public class GridmixSystemTestCase {
public static void runGridmixAndVerify(String [] runtimeValues,
String [] otherValues, String tracePath, int mode) throws Exception {
- List<JobID> jobids = runGridmix(runtimeValues, otherValues, mode);
+ jobids = runGridmix(runtimeValues, otherValues, mode);
gridmixJV = new GridmixJobVerification(
new Path(tracePath), gridmixJS.getJobConf(), jtClient);
gridmixJV.verifyGridmixJobsWithJobStories(jobids);
}
-
+
public static List<JobID> runGridmix(String[] runtimeValues,
String[] otherValues, int mode) throws Exception {
gridmixJS = new GridmixJobSubmission(rtClient.getDaemonConf(),
jtClient, gridmixDir);
gridmixJS.submitJobs(runtimeValues, otherValues, mode);
- List<JobID> jobids = UtilsForGridmix.listGridmixJobIDs(
- jtClient.getClient(), gridmixJS.getGridmixJobCount());
+ jobids = UtilsForGridmix.listGridmixJobIDs(jtClient.getClient(),
+ gridmixJS.getGridmixJobCount());
return jobids;
}
@@ -127,10 +126,4 @@ public class GridmixSystemTestCase {
}
return null;
}
-
- public static boolean isLocalDistCache(String fileName, String userName,
- boolean visibility) {
- return DistributedCacheEmulator.isLocalDistCacheFile(fileName,
- userName, visibility);
- }
}
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java Thu Mar 17 20:21:13 2011
@@ -177,10 +177,9 @@ public class TestGridMixDataGeneration {
dataSize + 0.1 > inputSize || dataSize - 0.1 < inputSize);
JobClient jobClient = jtClient.getClient();
- int len = jobClient.getAllJobs().length;
LOG.info("Verify the job status after completion of job.");
Assert.assertEquals("Job has not succeeded.", JobStatus.SUCCEEDED,
- jobClient.getAllJobs()[len-1].getRunState());
+ jobClient.getAllJobs()[0].getRunState());
}
private void verifyEachNodeSize(Path inputDir) throws IOException {
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java Thu Mar 17 20:21:13 2011
@@ -108,29 +108,4 @@ public class GridMixConfig {
*/
public static final String GRIDMIX_DISTCACHE_ENABLE =
"gridmix.distributed-cache-emulation.enable";
-
- /**
- * Gridmix distributed cache visibilities.
- */
- public static final String GRIDMIX_DISTCACHE_VISIBILITIES =
- "mapreduce.job.cache.files.visibilities";
-
- /**
- * Gridmix distributed cache files.
- */
- public static final String GRIDMIX_DISTCACHE_FILES =
- "mapreduce.job.cache.files";
-
- /**
- * Gridmix distributed cache files size.
- */
- public static final String GRIDMIX_DISTCACHE_FILESSIZE =
- "mapreduce.job.cache.files.filesizes";
-
- /**
- * Gridmix distributed cache files time stamp.
- */
- public static final String GRIDMIX_DISTCACHE_TIMESTAMP =
- "mapreduce.job.cache.files.timestamps";
}
-
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java Thu Mar 17 20:21:13 2011
@@ -18,25 +18,17 @@
package org.apache.hadoop.mapred.gridmix.test.system;
import java.io.IOException;
-import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Map;
import java.util.HashMap;
-import java.util.SortedMap;
-import java.util.TreeMap;
import java.util.Set;
-import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Counter;
@@ -53,7 +45,6 @@ import org.apache.hadoop.tools.rumen.Tas
import org.junit.Assert;
import java.text.ParseException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.mapred.gridmix.GridmixSystemTestCase;
/**
* Verifying each Gridmix job with corresponding job story in a trace file.
*/
@@ -63,8 +54,6 @@ public class GridmixJobVerification {
private Path path;
private Configuration conf;
private JTClient jtClient;
- private Map<String, List<JobConf>> simuAndOrigJobsInfo =
- new HashMap<String, List<JobConf>>();
/**
* Gridmix job verification constructor
* @param path - path of the gridmix output directory.
@@ -87,12 +76,11 @@ public class GridmixJobVerification {
public void verifyGridmixJobsWithJobStories(List<JobID> jobids)
throws IOException, ParseException {
- SortedMap <Long, String> origSubmissionTime = new TreeMap <Long, String>();
- SortedMap <Long, String> simuSubmissionTime = new TreeMap<Long, String>();
+ List<Long> origSubmissionTime = new ArrayList<Long>();
+ List<Long> simuSubmissionTime = new ArrayList<Long>();
GridmixJobStory gjs = new GridmixJobStory(path, conf);
final Iterator<JobID> ite = jobids.iterator();
- File destFolder = new File(System.getProperty("java.io.tmpdir") +
- "/gridmix-st/");
+ java.io.File destFolder = new java.io.File("/tmp/gridmix-st/");
destFolder.mkdir();
while (ite.hasNext()) {
@@ -105,491 +93,129 @@ public class GridmixJobVerification {
long expReduceInputRecs = 0;
long expReduceOutputRecs = 0;
- JobID simuJobId = ite.next();
- JobHistoryParser.JobInfo jhInfo = getSimulatedJobHistory(simuJobId);
- Assert.assertNotNull("Job history not found.", jhInfo);
+ JobID currJobId = ite.next();
+ String historyFilePath = jtClient.getProxy().
+ getJobHistoryLocationForRetiredJob(currJobId);
+ Path jhpath = new Path(historyFilePath);
+ FileSystem fs = jhpath.getFileSystem(conf);
+ JobHistoryParser jhparser = new JobHistoryParser(fs, jhpath);
+ JobHistoryParser.JobInfo jhInfo = jhparser.parse();
Counters counters = jhInfo.getTotalCounters();
- JobConf simuJobConf = getSimulatedJobConf(simuJobId,destFolder);
- String origJobId = simuJobConf.get("gridmix.job.original-job-id");
+
+ fs.copyToLocalFile(jhpath,new Path(destFolder.toString()));
+ fs.copyToLocalFile(new Path(historyFilePath + "_conf.xml"),
+ new Path(destFolder.toString()));
+ JobConf jobConf = new JobConf(conf);
+ jobConf.addResource(new Path("/tmp/gridmix-st/" +
+ currJobId + "_conf.xml"));
+ String origJobId = jobConf.get("gridmix.job.original-job-id");
LOG.info("OriginalJobID<->CurrentJobID:" +
- origJobId + "<->" + simuJobId);
+ origJobId + "<->" + currJobId);
ZombieJob zombieJob = gjs.getZombieJob(JobID.forName(origJobId));
- Map<String, Long> mapJobCounters = getJobMapCounters(zombieJob);
- Map<String, Long> reduceJobCounters = getJobReduceCounters(zombieJob);
-
- if (simuJobConf.get("gridmix.job-submission.policy").contains("REPLAY")) {
- origSubmissionTime.put(zombieJob.getSubmissionTime(),
- origJobId.toString() + "^" + simuJobId);
- simuSubmissionTime.put(jhInfo.getSubmitTime() ,
- origJobId.toString() + "^" + simuJobId); ;
+ LoggedJob loggedJob = zombieJob.getLoggedJob();
+
+ for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
+ TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
+ expMapInputBytes += mapTask.getInputBytes();
+ expMapOutputBytes += mapTask.getOutputBytes();
+ expMapInputRecs += mapTask.getInputRecords();
+ expMapOutputRecs += mapTask.getOutputRecords();
}
- LOG.info("Verifying the job <" + simuJobId + "> and wait for a while...");
- verifySimulatedJobSummary(zombieJob, jhInfo, simuJobConf);
- verifyJobMapCounters(counters,mapJobCounters,simuJobConf);
- verifyJobReduceCounters(counters,reduceJobCounters,simuJobConf);
- verifyDistributeCache(zombieJob,simuJobConf);
- setJobDistributedCacheInfo(simuJobId.toString(), simuJobConf,
- zombieJob.getJobConf());
- LOG.info("Done.");
- }
- verifyDistributedCacheBetweenJobs(simuAndOrigJobsInfo);
- }
-
- /**
- * Verify the distributed cache files between the jobs in a gridmix run.
- * @param jobsInfo - jobConfs of simulated and original jobs as a map.
- */
- public void verifyDistributedCacheBetweenJobs(Map<String,
- List<JobConf>> jobsInfo) {
- if (jobsInfo.size() > 1) {
- Map<String, Integer> simJobfilesOccurBtnJobs =
- getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 0);
- Map<String, Integer> origJobfilesOccurBtnJobs =
- getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 1);
- List<Integer> simuOccurList =
- getMapValuesAsList(simJobfilesOccurBtnJobs);
- Collections.sort(simuOccurList);
- List<Integer> origOccurList =
- getMapValuesAsList(origJobfilesOccurBtnJobs);
- Collections.sort(origOccurList);
- Assert.assertTrue("The unique count of distibuted cache files in " +
- "simulated jobs have not matched with the unique count of " +
- "original jobs distributed files ",
- simuOccurList.size() == origOccurList.size());
- int index = 0;
- for(Integer origDistFileCount : origOccurList) {
- Assert.assertTrue("Distributed cache file reused in simulated " +
- "jobs has not matched with reused of distributed cache file " +
- "in original jobs.",origDistFileCount == simuOccurList.get(index));
- index ++;
- }
- }
- }
-
- private List<Integer> getMapValuesAsList(Map<String,Integer> jobOccurs) {
- List<Integer> occursList = new ArrayList<Integer>();
- Set<String> files = jobOccurs.keySet();
- Iterator<String > ite = files.iterator();
- while(ite.hasNext()) {
- String file = ite.next();
- occursList.add(jobOccurs.get(file));
- }
- return occursList;
- }
-
-
- /**
- * Get the unique distributed cache files and occurrence between the jobs.
- * @param jobsInfo - job's configurations as a map.
- * @param jobConfIndex - 0 for simulated job configuration and
- * 1 for original jobs configuration.
- * @return - unique distributed cache files and occurrences as map.
- */
- private Map<String, Integer> getDistcacheFilesOccurenceBetweenJobs(
- Map<String, List<JobConf>> jobsInfo, int jobConfIndex) {
- Map<String,Integer> filesOccurBtnJobs = new HashMap <String,Integer>();
- Set<String> jobIds = jobsInfo.keySet();
- Iterator<String > ite = jobIds.iterator();
- while(ite.hasNext()){
- String jobId = ite.next();
- List<JobConf> jobconfs = jobsInfo.get(jobId);
- String [] distCacheFiles = jobconfs.get(jobConfIndex).
- get(GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(",");
- String [] distCacheFileTimeStamps = jobconfs.get(jobConfIndex).
- get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
- String [] distCacheFileVisib = jobconfs.get(jobConfIndex).
- get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
- int indx = 0;
- for (String distCacheFile : distCacheFiles) {
- String fileAndSize = distCacheFile + "^" +
- distCacheFileTimeStamps[indx] + "^" +
- jobconfs.get(jobConfIndex).getUser();
- if (filesOccurBtnJobs.get(fileAndSize)!= null) {
- int count = filesOccurBtnJobs.get(fileAndSize);
- count ++;
- filesOccurBtnJobs.put(fileAndSize,count);
- } else {
- filesOccurBtnJobs.put(fileAndSize,1);
- }
- }
- }
- return filesOccurBtnJobs;
- }
-
- private void setJobDistributedCacheInfo(String jobId, JobConf simuJobConf,
- JobConf origJobConf) {
- if (simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES)!= null) {
- List<JobConf> jobConfs = new ArrayList<JobConf>();
- jobConfs.add(simuJobConf);
- jobConfs.add(origJobConf);
- simuAndOrigJobsInfo.put(jobId,jobConfs);
- }
- }
-
- /**
- * Verify the job subimssion order between the jobs in replay mode.
- * @param origSubmissionTime - sorted map of original jobs submission times.
- * @param simuSubmissionTime - sorted map of simulated jobs submission times.
- */
- public void verifyJobSumissionTime(SortedMap<Long, String> origSubmissionTime,
- SortedMap<Long, String> simuSubmissionTime){
- Assert.assertTrue("Simulated job's submission time count has " +
- "not match with Original job's submission time count.",
- origSubmissionTime.size() == simuSubmissionTime.size());
- for ( int index = 0; index < origSubmissionTime.size(); index ++) {
- String origAndSimuJobID = origSubmissionTime.get(index);
- String simuAndorigJobID = simuSubmissionTime.get(index);
- Assert.assertEquals("Simulated jobs have not submitted in same " +
- "order as original jobs submitted inREPLAY mode.",
- origAndSimuJobID, simuAndorigJobID);
- }
- }
-
- /**
- * It verifies the distributed cache emulation of a job.
- * @param zombieJob - Original job story.
- * @param simuJobConf - Simulated job configuration.
- */
- public void verifyDistributeCache(ZombieJob zombieJob,
- JobConf simuJobConf) throws IOException {
-
- if (simuJobConf.getBoolean(GridMixConfig.GRIDMIX_DISTCACHE_ENABLE, false)) {
- JobConf origJobConf = zombieJob.getJobConf();
- checkFileVisibility(simuJobConf);
- checkDistcacheFiles(simuJobConf,origJobConf);
- checkFileSizes(simuJobConf,origJobConf);
- checkFileStamps(simuJobConf,origJobConf);
- } else {
- Assert.assertNull("Configuration has distributed cache visibilites" +
- "without enabled distributed cache emulation.", simuJobConf.
- get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES));
- Assert.assertNull("Configuration has distributed cache files time " +
- "stamps without enabled distributed cache emulation.",simuJobConf.
- get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP));
- Assert.assertNull("Configuration has distributed cache files paths" +
- "without enabled distributed cache emulation.",simuJobConf.
- get(GridMixConfig.GRIDMIX_DISTCACHE_FILES));
- Assert.assertNull("Configuration has distributed cache files sizes" +
- "without enabled distributed cache emulation.",simuJobConf.
- get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE));
- }
- }
-
- private void checkFileStamps(JobConf simuJobConf, JobConf origJobConf) {
- //Verify simulated jobs against distributed cache files time stamps.
- String [] origDCFTS = origJobConf.get(GridMixConfig.
- GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
- String [] simuDCFTS = simuJobConf.get(GridMixConfig.
- GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
- for (int index = 0; index < origDCFTS.length; index++) {
- Assert.assertTrue("Invalid time stamps between original " +
- "and simulated job",Long.parseLong(origDCFTS[index]) <
- Long.parseLong(simuDCFTS[index]));
- }
- }
+ for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
+ TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
+ expReduceInputBytes += reduceTask.getInputBytes();
+ expReduceOutputBytes += reduceTask.getOutputBytes();
+ expReduceInputRecs += reduceTask.getInputRecords();
+ expReduceOutputRecs += reduceTask.getOutputRecords();
+ }
- private void checkFileVisibility(JobConf simuJobConf ) {
- // Verify simulated jobs against distributed cache files visibilities.
- String [] distFiles = simuJobConf.get(GridMixConfig.
- GRIDMIX_DISTCACHE_FILES).split(",");
- String [] simuDistVisibilities = simuJobConf.get(GridMixConfig.
- GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
- List<Boolean> expFileVisibility = new ArrayList<Boolean >();
- int index = 0;
- for (String distFile : distFiles) {
- if (!GridmixSystemTestCase.isLocalDistCache(distFile,
- simuJobConf.getUser(),Boolean.valueOf(simuDistVisibilities[index]))) {
- expFileVisibility.add(true);
+ LOG.info("Verifying the job <" + currJobId + "> and wait for a while...");
+ Assert.assertEquals("Job id has not matched",
+ zombieJob.getJobID(), JobID.forName(origJobId));
+
+ Assert.assertEquals("Job maps have not matched",
+ zombieJob.getNumberMaps(),
+ jhInfo.getTotalMaps());
+
+ if (!jobConf.getBoolean("gridmix.sleep.maptask-only",false)) {
+ Assert.assertEquals("Job reducers have not matched",
+ zombieJob.getNumberReduces(), jhInfo.getTotalReduces());
} else {
- expFileVisibility.add(false);
- }
- index ++;
- }
- index = 0;
- for (String actFileVisibility : simuDistVisibilities) {
- Assert.assertEquals("Simulated job distributed cache file " +
- "visibilities has not matched.",
- expFileVisibility.get(index),Boolean.valueOf(actFileVisibility));
- index ++;
- }
- }
-
- private void checkDistcacheFiles(JobConf simuJobConf, JobConf origJobConf)
- throws IOException {
- //Verify simulated jobs against distributed cache files.
- String [] origDistFiles = origJobConf.get(GridMixConfig.
- GRIDMIX_DISTCACHE_FILES).split(",");
- String [] simuDistFiles = simuJobConf.get(GridMixConfig.
- GRIDMIX_DISTCACHE_FILES).split(",");
- String [] simuDistVisibilities = simuJobConf.get(GridMixConfig.
- GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
- Assert.assertEquals("No. of simulatued job's distcache files " +
- "haven't matched with no.of original job's distcache files",
- origDistFiles.length, simuDistFiles.length);
-
- int index = 0;
- for (String simDistFile : simuDistFiles) {
- Path distPath = new Path(simDistFile);
- if (!GridmixSystemTestCase.isLocalDistCache(simDistFile,
- simuJobConf.getUser(),Boolean.valueOf(simuDistVisibilities[index]))) {
- FileSystem fs = distPath.getFileSystem(conf);
- FileStatus fstat = fs.getFileStatus(distPath);
- FsPermission permission = fstat.getPermission();
- Assert.assertTrue("HDFS distributed cache file has wrong " +
- "permissions for users.", FsAction.READ_WRITE.SYMBOL ==
- permission.getUserAction().SYMBOL);
- Assert.assertTrue("HDFS distributed cache file has wrong " +
- "permissions for groups.",FsAction.READ.SYMBOL ==
- permission.getGroupAction().SYMBOL);
- Assert.assertTrue("HDSFS distributed cache file has wrong " +
- "permissions for others.",FsAction.READ.SYMBOL ==
- permission.getOtherAction().SYMBOL);
+ Assert.assertEquals("Job reducers have not matched",
+ 0, jhInfo.getTotalReduces());
}
- }
- index ++;
- }
- private void checkFileSizes(JobConf simuJobConf, JobConf origJobConf) {
- // Verify simulated jobs against distributed cache files size.
- List<String> origDistFilesSize = Arrays.asList(origJobConf.
- get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(","));
- Collections.sort(origDistFilesSize);
- List<String> simuDistFilesSize = Arrays.asList(simuJobConf.
- get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(","));
- Collections.sort(simuDistFilesSize);
- Assert.assertEquals("Simulated job's file size list has not " +
- "matched with the Original job's file size list.",
- origDistFilesSize.size(),
- simuDistFilesSize.size());
- for ( int index = 0; index < origDistFilesSize.size(); index ++ ) {
- Assert.assertEquals("Simulated job distcache file size has not " +
- "matched with original job distcache file size.",
- origDistFilesSize.get(index), simuDistFilesSize.get(index));
- }
- }
-
- /**
- * It verifies the simulated job map counters.
- * @param counters - Original job map counters.
- * @param mapJobCounters - Simulated job map counters.
- * @param jobConf - Simulated job configuration.
- * @throws ParseException - If an parser error occurs.
- */
- public void verifyJobMapCounters(Counters counters,
- Map<String,Long> mapCounters, JobConf jobConf) throws ParseException {
- if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
- //The below statements have commented due to a bug(MAPREDUCE-2135).
- /*Assert.assertTrue("Map input bytes have not matched.<exp:[" +
- convertBytes(mapCounters.get("MAP_INPUT_BYTES").longValue()) +"]><act:[" +
- convertBytes(getCounterValue(counters,"HDFS_BYTES_READ")) + "]>",
- convertBytes(mapCounters.get("MAP_INPUT_BYTES").longValue()).equals(
- convertBytes(getCounterValue(counters,"HDFS_BYTES_READ"))));
-
- Assert.assertTrue("Map output bytes has not matched.<exp:[" +
- convertBytes(mapCounters.get("MAP_OUTPUT_BYTES").longValue()) + "]><act:[" +
- convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES")) + "]>",
- convertBytes(mapCounters.get("MAP_OUTPUT_BYTES").longValue()).equals(
- convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES"))));*/
-
- Assert.assertEquals("Map input records have not matched.<exp:[" +
- mapCounters.get("MAP_INPUT_RECS").longValue() + "]><act:[" +
- getCounterValue(counters, "MAP_INPUT_RECORDS") + "]>",
- mapCounters.get("MAP_INPUT_RECS").longValue(),
- getCounterValue(counters, "MAP_INPUT_RECORDS"));
-
- // The below statements have commented due to a bug(MAPREDUCE-2154).
- /*Assert.assertEquals("Map output records have not matched.<exp:[" +
- mapCounters.get("MAP_OUTPUT_RECS").longValue() + "]><act:[" +
- getCounterValue(counters, "MAP_OUTPUT_RECORDS") + "]>",
- mapCounters.get("MAP_OUTPUT_RECS").longValue(),
- getCounterValue(counters, "MAP_OUTPUT_RECORDS"));*/
- } else {
- Assert.assertTrue("Map Input Bytes are zero",
- getCounterValue(counters,"HDFS_BYTES_READ") != 0);
- Assert.assertNotNull("Map Input Records are zero",
- getCounterValue(counters, "MAP_INPUT_RECORDS")!=0);
- }
- }
-
- /**
- * It verifies the simulated job reduce counters.
- * @param counters - Original job reduce counters.
- * @param reduceCounters - Simulated job reduce counters.
- * @param jobConf - simulated job configuration.
- * @throws ParseException - if an parser error occurs.
- */
- public void verifyJobReduceCounters(Counters counters,
- Map<String,Long> reduceCounters, JobConf jobConf) throws ParseException {
- if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
- /*Assert.assertTrue("Reduce input bytes have not matched.<exp:[" +
- convertBytes(reduceCounters.get("REDUCE_INPUT_BYTES").longValue()) + "]><act:[" +
- convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES")) + "]>",
- convertBytes(reduceCounters.get("REDUCE_INPUT_BYTES").longValue()).equals(
- convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES"))));*/
-
- /*Assert.assertEquals("Reduce output bytes have not matched.<exp:[" +
- convertBytes(reduceCounters.get("REDUCE_OUTPUT_BYTES").longValue()) + "]><act:[" +
- convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN")) + "]>",
- convertBytes(reduceCounters.get("REDUCE_OUTPUT_BYTES").longValue()).equals(
- convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN"))));*/
-
- /*Assert.assertEquals("Reduce output records have not matched.<exp:[" +
- reduceCounters.get("REDUCE_OUTPUT_RECS").longValue() + "]><act:[" + getCounterValue(counters,
- "REDUCE_OUTPUT_RECORDS") + "]>",
- reduceCounters.get("REDUCE_OUTPUT_RECS").longValue(), getCounterValue(counters,
- "REDUCE_OUTPUT_RECORDS"));*/
-
- /*Assert.assertEquals("Reduce input records have not matched.<exp:[" +
- reduceCounters.get("REDUCE_INPUT_RECS").longValue() + "]><act:[" + getCounterValue(counters,
- "REDUCE_INPUT_RECORDS") + "]>",
- reduceCounters.get("REDUCE_INPUT_RECS").longValue(),
- getCounterValue(counters,"REDUCE_INPUT_RECORDS"));*/
- } else {
- Assert.assertTrue("Reduce output records are not zero for sleep job.",
- getCounterValue(counters, "REDUCE_OUTPUT_RECORDS") == 0);
- Assert.assertTrue("Reduce output bytes are not zero for sleep job.",
- getCounterValue(counters,"HDFS_BYTES_WRITTEN") == 0);
- }
- }
-
- /**
- * It verifies the gridmix simulated job summary.
- * @param zombieJob - Original job summary.
- * @param jhInfo - Simulated job history info.
- * @param jobConf - simulated job configuration.
- * @throws IOException - if an I/O error occurs.
- */
- public void verifySimulatedJobSummary(ZombieJob zombieJob,
- JobHistoryParser.JobInfo jhInfo, JobConf jobConf) throws IOException {
- Assert.assertEquals("Job id has not matched",
- zombieJob.getJobID(), JobID.forName(
- jobConf.get("gridmix.job.original-job-id")));
-
- Assert.assertEquals("Job maps have not matched",
- zombieJob.getNumberMaps(), jhInfo.getTotalMaps());
-
- if (!jobConf.getBoolean("gridmix.sleep.maptask-only",false)) {
- Assert.assertEquals("Job reducers have not matched",
- zombieJob.getNumberReduces(), jhInfo.getTotalReduces());
- } else {
- Assert.assertEquals("Job reducers have not matched",
- 0, jhInfo.getTotalReduces());
- }
-
- Assert.assertEquals("Job status has not matched.",
- zombieJob.getOutcome().name(),
- convertJobStatus(jhInfo.getJobStatus()));
-
- LoggedJob loggedJob = zombieJob.getLoggedJob();
- Assert.assertEquals("Job priority has not matched.",
- loggedJob.getPriority().toString(), jhInfo.getPriority());
-
- if (jobConf.get("gridmix.user.resolve.class").contains("RoundRobin")) {
- Assert.assertTrue(jhInfo.getJobId().toString() +
- " has not impersonate with other user.", !jhInfo.getUsername()
- .equals(UserGroupInformation.getLoginUser().getShortUserName()));
- }
- }
-
- /**
- * Get the original job map counters from a trace.
- * @param zombieJob - Original job story.
- * @return - map counters as a map.
- */
- public Map<String, Long> getJobMapCounters(ZombieJob zombieJob) {
- long expMapInputBytes = 0;
- long expMapOutputBytes = 0;
- long expMapInputRecs = 0;
- long expMapOutputRecs = 0;
- Map<String,Long> mapCounters = new HashMap<String,Long>();
- for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
- TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
- expMapInputBytes += mapTask.getInputBytes();
- expMapOutputBytes += mapTask.getOutputBytes();
- expMapInputRecs += mapTask.getInputRecords();
- expMapOutputRecs += mapTask.getOutputRecords();
- }
- mapCounters.put("MAP_INPUT_BYTES", expMapInputBytes);
- mapCounters.put("MAP_OUTPUT_BYTES", expMapOutputBytes);
- mapCounters.put("MAP_INPUT_RECS", expMapInputRecs);
- mapCounters.put("MAP_OUTPUT_RECS", expMapOutputRecs);
- return mapCounters;
- }
-
- /**
- * Get the original job reduce counters from a trace.
- * @param zombieJob - Original job story.
- * @return - reduce counters as a map.
- */
- public Map<String,Long> getJobReduceCounters(ZombieJob zombieJob) {
- long expReduceInputBytes = 0;
- long expReduceOutputBytes = 0;
- long expReduceInputRecs = 0;
- long expReduceOutputRecs = 0;
- Map<String,Long> reduceCounters = new HashMap<String,Long>();
- for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
- TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
- expReduceInputBytes += reduceTask.getInputBytes();
- expReduceOutputBytes += reduceTask.getOutputBytes();
- expReduceInputRecs += reduceTask.getInputRecords();
- expReduceOutputRecs += reduceTask.getOutputRecords();
- }
- reduceCounters.put("REDUCE_INPUT_BYTES", expReduceInputBytes);
- reduceCounters.put("REDUCE_OUTPUT_BYTES", expReduceOutputBytes);
- reduceCounters.put("REDUCE_INPUT_RECS", expReduceInputRecs);
- reduceCounters.put("REDUCE_OUTPUT_RECS", expReduceOutputRecs);
- return reduceCounters;
- }
+ Assert.assertEquals("Job status has not matched.",
+ zombieJob.getOutcome().name(),
+ convertJobStatus(jhInfo.getJobStatus()));
+
+ Assert.assertEquals("Job priority has not matched.",
+ loggedJob.getPriority().toString(), jhInfo.getPriority());
+
+ if (jobConf.get("gridmix.user.resolve.class").contains("RoundRobin")) {
+ Assert.assertTrue(currJobId + "has not impersonate with other user.",
+ !jhInfo.getUsername().equals(UserGroupInformation.
+ getLoginUser().getShortUserName()));
+ }
- /**
- * Get the simulated job configuration of a job.
- * @param simulatedJobID - Simulated job id.
- * @param tmpJHFolder - temporary job history folder location.
- * @return - simulated job configuration.
- * @throws IOException - If an I/O error occurs.
- */
- public JobConf getSimulatedJobConf(JobID simulatedJobID, File tmpJHFolder)
- throws IOException{
- FileSystem fs = null;
- try {
- String historyFilePath = jtClient.getProxy().
- getJobHistoryLocationForRetiredJob(simulatedJobID);
- Path jhpath = new Path(historyFilePath);
- fs = jhpath.getFileSystem(conf);
- fs.copyToLocalFile(jhpath,new Path(tmpJHFolder.toString()));
- fs.copyToLocalFile(new Path(historyFilePath + "_conf.xml"),
- new Path(tmpJHFolder.toString()));
- JobConf jobConf = new JobConf();
- jobConf.addResource(new Path(tmpJHFolder.toString() + "/" +
- simulatedJobID + "_conf.xml"));
- jobConf.reloadConfiguration();
- return jobConf;
- }finally {
- fs.close();
- }
- }
+ if (jobConf.get("gridmix.job-submission.policy").contains("REPLAY")) {
+ origSubmissionTime.add(zombieJob.getSubmissionTime());
+ simuSubmissionTime.add(jhInfo.getSubmitTime());
+ }
- /**
- * Get the simulated job history of a job.
- * @param simulatedJobID - simulated job id.
- * @return - simulated job information.
- * @throws IOException - if an I/O error occurs.
- */
- public JobHistoryParser.JobInfo getSimulatedJobHistory(JobID simulatedJobID)
- throws IOException {
- FileSystem fs = null;
- try {
- String historyFilePath = jtClient.getProxy().
- getJobHistoryLocationForRetiredJob(simulatedJobID);
- Path jhpath = new Path(historyFilePath);
- fs = jhpath.getFileSystem(conf);
- JobHistoryParser jhparser = new JobHistoryParser(fs, jhpath);
- JobHistoryParser.JobInfo jhInfo = jhparser.parse();
- return jhInfo;
- } finally {
- fs.close();
+ if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
+
+ //The below statements have commented due to a bug(MAPREDUCE-2135).
+ /* Assert.assertTrue("Map input bytes have not matched.<exp:[" +
+ convertBytes(expMapInputBytes) +"]><act:[" +
+ convertBytes(getCounterValue(counters,"HDFS_BYTES_READ")) + "]>",
+ convertBytes(expMapInputBytes).equals(
+ convertBytes(getCounterValue(counters,"HDFS_BYTES_READ"))));
+
+ Assert.assertTrue("Map output bytes has not matched.<exp:[" +
+ convertBytes(expMapOutputBytes) + "]><act:[" +
+ convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES")) + "]>",
+ convertBytes(expMapOutputBytes).equals(
+ convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES"))));*/
+
+ Assert.assertEquals("Map input records have not matched.<exp:[" +
+ expMapInputRecs + "]><act:[" +
+ getCounterValue(counters, "MAP_INPUT_RECORDS") + "]>",
+ expMapInputRecs, getCounterValue(counters, "MAP_INPUT_RECORDS"));
+
+ // The below statements have commented due to a bug(MAPREDUCE-2154).
+ /*Assert.assertEquals("Map output records have not matched.<exp:[" +
+ expMapOutputRecs + "]><act:[" +
+ getCounterValue(counters, "MAP_OUTPUT_RECORDS") + "]>",
+ expMapOutputRecs, getCounterValue(counters, "MAP_OUTPUT_RECORDS"));*/
+
+ /*Assert.assertTrue("Reduce input bytes have not matched.<exp:[" +
+ convertBytes(expReduceInputBytes) + "]><act:[" +
+ convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES")) + "]>",
+ convertBytes(expReduceInputBytes).equals(
+ convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES"))));*/
+
+ /*Assert.assertEquals("Reduce output bytes have not matched.<exp:[" +
+ convertBytes(expReduceOutputBytes) + "]><act:[" +
+ convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN")) + "]>",
+ convertBytes(expReduceOutputBytes).equals(
+ convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN"))));*/
+
+ /*Assert.assertEquals("Reduce output records have not matched.<exp:[" +
+ expReduceOutputRecs + "]><act:[" + getCounterValue(counters,
+ "REDUCE_OUTPUT_RECORDS") + "]>",
+ expReduceOutputRecs, getCounterValue(counters,
+ "REDUCE_OUTPUT_RECORDS"));*/
+
+ /*Assert.assertEquals("Reduce input records have not matched.<exp:[" +
+ expReduceInputRecs + "]><act:[" + getCounterValue(counters,
+ "REDUCE_INPUT_RECORDS") + "]>",
+ expReduceInputRecs,
+ getCounterValue(counters,"REDUCE_INPUT_RECORDS"));*/
+ LOG.info("Done.");
+ }
}
}
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java Thu Mar 17 20:21:13 2011
@@ -266,8 +266,7 @@ public class UtilsForGridmix {
JobStatus js = jobStatus[numJobs - index];
JobID jobid = js.getJobID();
String jobName = js.getJobName();
- if (!jobName.equals("GRIDMIX_GENERATE_INPUT_DATA") &&
- !jobName.equals("GRIDMIX_GENERATE_DISTCACHE_DATA")) {
+ if (!jobName.equals("GRIDMIX_GENERATE_INPUT_DATA")) {
jobids.add(jobid);
}
}
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java Thu Mar 17 20:21:13 2011
@@ -48,6 +48,6 @@ class SimulatorLaunchTaskAction extends
@Override
public String toString() {
return this.getClass().getName() + "[taskID=" +
- this.getTask().getTaskID() + "]";
+ this.getTask().getTask().getTaskID() + "]";
}
}
Modified: hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java Thu Mar 17 20:21:13 2011
@@ -357,7 +357,8 @@ public class SimulatorTaskTracker implem
}
// First, create statuses and update used slots for map and reduce
// task separately
- Task task = action.getTask();
+ TTTask ttTask = action.getTask();
+ Task task = ttTask.getTask();
TaskAttemptID taskId = task.getTaskID();
if (tasks.containsKey(taskId)) {
throw new IllegalArgumentException("Multiple launch of task id =" + taskId);
Modified: hadoop/mapreduce/branches/MR-279/src/examples/org/apache/hadoop/examples/pi/DistSum.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/examples/org/apache/hadoop/examples/pi/DistSum.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/examples/org/apache/hadoop/examples/pi/DistSum.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/examples/org/apache/hadoop/examples/pi/DistSum.java Thu Mar 17 20:21:13 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobTracker;
+import org.apache.hadoop.mapred.Master;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -387,7 +388,7 @@ public final class DistSum extends Confi
public synchronized void init(Job job) throws IOException {
final Configuration conf = job.getConfiguration();
if (cluster == null)
- cluster = new Cluster(JobTracker.getAddress(conf), conf);
+ cluster = new Cluster(Master.getMasterAddress(conf), conf);
chooseMachine(conf).init(job);
}
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java Thu Mar 17 20:21:13 2011
@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.KerberosInfo;
/**
@@ -31,7 +31,7 @@ import org.apache.hadoop.security.Kerber
* NOT_TO_BE_USED_BY_USERS_DIRECTLY.
*/
@KerberosInfo(
- serverPrincipal = JTConfig.JT_USER_NAME)
+ serverPrincipal = MRConfig.MASTER_USER_NAME)
@InterfaceAudience.Private
@InterfaceStability.Stable
public interface AdminOperationsProtocol extends VersionedProtocol {
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/Child.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/Child.java Thu Mar 17 20:21:13 2011
@@ -80,10 +80,10 @@ class Child {
DefaultMetricsSystem.initialize(
StringUtils.camelize(firstTaskid.getTaskType().name()) +"Task");
- cwd = System.getenv().get(TaskRunner.HADOOP_WORK_DIR);
+ cwd = System.getenv().get(Constants.HADOOP_WORK_DIR);
if (cwd == null) {
throw new IOException("Environment variable " +
- TaskRunner.HADOOP_WORK_DIR + " is not set");
+ Constants.HADOOP_WORK_DIR + " is not set");
}
//load token cache storage
String jobTokenFile =
@@ -306,7 +306,7 @@ class Child {
LocalDirAllocator lDirAlloc =
new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
Path localTaskFile =
- lDirAlloc.getLocalPathForWrite(TaskTracker.JOBFILE, jobConf);
+ lDirAlloc.getLocalPathForWrite(Constants.JOBFILE, jobConf);
JobLocalizer.writeLocalJobFile(localTaskFile, jobConf);
task.setJobFile(localTaskFile.toString());
task.setConf(jobConf);
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Thu Mar 17 20:21:13 2011
@@ -52,45 +52,6 @@ public class DefaultTaskController exten
private static final Log LOG =
LogFactory.getLog(DefaultTaskController.class);
private FileSystem fs;
- /**
- * Launch a new JVM for the task.
- *
- * This method launches the new JVM for the task by executing the
- * the JVM command using the {@link Shell.ShellCommandExecutor}
- */
- void launchTaskJVM(TaskController.TaskControllerContext context)
- throws IOException {
- initializeTask(context);
-
- JvmEnv env = context.env;
- List<String> wrappedCommand =
- TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
- env.logSize, true);
- ShellCommandExecutor shexec =
- new ShellCommandExecutor(wrappedCommand.toArray(new String[0]),
- env.workDir, env.env);
- // set the ShellCommandExecutor for later use.
- context.shExec = shexec;
- shexec.execute();
- }
-
- /**
- * Initialize the task environment.
- *
- * Since tasks are launched as the tasktracker user itself, this
- * method has no action to perform.
- */
- void initializeTask(TaskController.TaskControllerContext context) {
- // The default task controller does not need to set up
- // any permissions for proper execution.
- // So this is a dummy method.
- return;
- }
-
- /*
- * No need to do anything as we don't need to do as we dont need anything
- * extra from what TaskTracker has done.
- */
@Override
public void setConf(Configuration conf) {
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Thu Mar 17 20:21:13 2011
@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.KerberosInfo;
@@ -32,7 +32,7 @@ import org.apache.hadoop.security.Kerber
* The JobTracker is the Server, which implements this protocol.
*/
@KerberosInfo(
- serverPrincipal = JTConfig.JT_USER_NAME,
+ serverPrincipal = MRConfig.MASTER_USER_NAME,
clientPrincipal = TTConfig.TT_USER_NAME)
@InterfaceAudience.Private
@InterfaceStability.Stable
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu Mar 17 20:21:13 2011
@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.split
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.util.HostUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
@@ -1849,16 +1850,6 @@ public class JobInProgress {
}
}
- public static String convertTrackerNameToHostName(String trackerName) {
- // Ugly!
- // Convert the trackerName to its host name
- int indexOfColon = trackerName.indexOf(":");
- String trackerHostName = (indexOfColon == -1) ?
- trackerName :
- trackerName.substring(0, indexOfColon);
- return trackerHostName.substring("tracker_".length());
- }
-
/**
* Note that a task has failed on a given tracker and add the tracker
* to the blacklist iff too many trackers in the cluster i.e.
@@ -1869,7 +1860,7 @@ public class JobInProgress {
synchronized void addTrackerTaskFailure(String trackerName,
TaskTracker taskTracker) {
if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) {
- String trackerHostName = convertTrackerNameToHostName(trackerName);
+ String trackerHostName = HostUtil.convertTrackerNameToHostName(trackerName);
Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
if (trackerFailures == null) {
@@ -1975,7 +1966,7 @@ public class JobInProgress {
}
private int getTrackerTaskFailures(String trackerName) {
- String trackerHostName = convertTrackerNameToHostName(trackerName);
+ String trackerHostName = HostUtil.convertTrackerNameToHostName(trackerName);
Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
return (failedTasks != null) ? failedTasks.intValue() : 0;
}
@@ -2730,7 +2721,7 @@ public class JobInProgress {
if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) &&
taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
if (LOG.isDebugEnabled()) {
- String flakyTracker = convertTrackerNameToHostName(taskTracker);
+ String flakyTracker = HostUtil.convertTrackerNameToHostName(taskTracker);
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker
+ "' for assigning a new task");
@@ -3311,7 +3302,7 @@ public class JobInProgress {
// get taskStatus from tip
TaskStatus taskStatus = tip.getTaskStatus(taskid);
String taskTrackerName = taskStatus.getTaskTracker();
- String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
+ String taskTrackerHostName = HostUtil.convertTrackerNameToHostName(taskTrackerName);
int taskTrackerPort = -1;
TaskTrackerStatus taskTrackerStatus =
(taskTracker == null) ? null : taskTracker.getStatus();
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobLocalizer.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobLocalizer.java Thu Mar 17 20:21:13 2011
@@ -117,7 +117,7 @@ public class JobLocalizer {
DISTDIR = JOBDIR + "/" + TaskTracker.DISTCACHEDIR;
WORKDIR = JOBDIR + "/work";
JARDST = JOBDIR + "/" + TaskTracker.JARSDIR + "/job.jar";
- JOBCONF = JOBDIR + "/" + TaskTracker.JOBFILE;
+ JOBCONF = JOBDIR + "/" + Constants.JOBFILE;
JOBTOKEN = JOBDIR + "/" + TaskTracker.JOB_TOKEN_FILE;
}
@@ -346,7 +346,7 @@ public class JobLocalizer {
DistributedCache.getCacheFiles(conf),
DistributedCache.getLocalCacheFiles(conf),
DistributedCache.getFileTimestamps(conf),
- TrackerDistributedCacheManager.
+ DistributedCache.
getFileVisibilities(conf),
false);
return
@@ -354,7 +354,7 @@ public class JobLocalizer {
DistributedCache.getCacheArchives(conf),
DistributedCache.getLocalCacheArchives(conf),
DistributedCache.getArchiveTimestamps(conf),
- TrackerDistributedCacheManager.
+ DistributedCache.
getArchiveVisibilities(conf),
true);
}
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Mar 17 20:21:13 2011
@@ -92,6 +92,7 @@ import org.apache.hadoop.mapreduce.serve
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.mapreduce.util.ServerConfigUtil;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -128,7 +129,7 @@ public class JobTracker implements MRCon
RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JTConfig {
static{
- ConfigUtil.loadResources();
+ ServerConfigUtil.loadResources();
}
private final long tasktrackerExpiryInterval;
@@ -163,7 +164,7 @@ public class JobTracker implements MRCon
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static enum State { INITIALIZING, RUNNING }
- State state = State.INITIALIZING;
+ volatile State state = State.INITIALIZING;
private static final int FS_ACCESS_RETRY_PERIOD = 10000;
static final String JOB_INFO_FILE = "job-info";
@@ -1372,11 +1373,11 @@ public class JobTracker implements MRCon
JobTracker(final JobConf conf, Clock newClock, String jobtrackerIndentifier)
throws IOException, InterruptedException {
// Set ports, start RPC servers, setup security policy etc.
- InetSocketAddress addr = getAddress(conf);
+ InetSocketAddress addr = Master.getMasterAddress(conf);
this.localMachine = addr.getHostName();
this.port = addr.getPort();
UserGroupInformation.setConfiguration(conf);
- SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME,
+ SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, MRConfig.MASTER_USER_NAME,
localMachine);
clock = newClock;
@@ -1510,7 +1511,8 @@ public class JobTracker implements MRCon
// The rpc/web-server ports can be ephemeral ports...
// ... ensure we have the correct info
this.port = interTrackerServer.getListenerAddress().getPort();
- this.conf.set(JT_IPC_ADDRESS, (this.localMachine + ":" + this.port));
+ this.conf.set(MRConfig.MASTER_ADDRESS,
+ (this.localMachine + ":" + this.port));
this.localFs = FileSystem.getLocal(conf);
LOG.info("JobTracker up at: " + this.port);
this.infoPort = this.infoServer.getPort();
@@ -1628,6 +1630,10 @@ public class JobTracker implements MRCon
completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
}
+ State getState() {
+ return state;
+ }
+
private static SimpleDateFormat getDateFormat() {
return new SimpleDateFormat("yyyyMMddHHmm");
}
@@ -1701,10 +1707,12 @@ public class JobTracker implements MRCon
return myInstrumentation;
}
+ /**
+ * @deprecated Use {@link Master#getMasterAddress(Configuration)} instead.
+ */
+ @Deprecated
public static InetSocketAddress getAddress(Configuration conf) {
- String jobTrackerStr =
- conf.get(JT_IPC_ADDRESS, "localhost:8012");
- return NetUtils.createSocketAddr(jobTrackerStr);
+ return Master.getMasterAddress(conf);
}
void startExpireTrackersThread() {
@@ -3157,7 +3165,7 @@ public class JobTracker implements MRCon
totalReduces,
totalMapTaskCapacity,
totalReduceTaskCapacity,
- state, getExcludedNodes().size()
+ getExcludedNodes().size()
);
} else {
return new ClusterStatus(taskTrackers.size() -
@@ -3168,7 +3176,7 @@ public class JobTracker implements MRCon
totalReduces,
totalMapTaskCapacity,
totalReduceTaskCapacity,
- state, getExcludedNodes().size());
+ getExcludedNodes().size());
}
}
}
@@ -4623,11 +4631,11 @@ public class JobTracker implements MRCon
conf.getInt("mapred.heartbeats.in.second", 100);
// Set ports, start RPC servers, setup security policy etc.
- InetSocketAddress addr = getAddress(conf);
+ InetSocketAddress addr = Master.getMasterAddress(conf);
this.localMachine = addr.getHostName();
this.port = addr.getPort();
UserGroupInformation.setConfiguration(conf);
- SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, JTConfig.JT_USER_NAME,
+ SecurityUtil.login(conf, JTConfig.JT_KEYTAB_FILE, MRConfig.MASTER_USER_NAME,
localMachine);
secretManager = null;
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java Thu Mar 17 20:21:13 2011
@@ -28,7 +28,7 @@ import java.io.IOException;
*
*/
class LaunchTaskAction extends TaskTrackerAction {
- private Task task;
+ private TTTask task;
public LaunchTaskAction() {
super(ActionType.LAUNCH_TASK);
@@ -36,35 +36,47 @@ class LaunchTaskAction extends TaskTrack
public LaunchTaskAction(Task task) {
super(ActionType.LAUNCH_TASK);
- this.task = task;
+ if (task.isMapTask()) {
+ this.task = new TTMapTask((MapTask)task);
+ } else {
+ if (task.isUberTask()) {
+ this.task = new TTUberTask((UberTask)task);
+ } else {
+ this.task = new TTReduceTask((ReduceTask)task);
+ }
+ }
}
- public Task getTask() {
+ public TTTask getTask() {
return task;
}
-
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(task.isMapTask());
- if (!task.isMapTask()) {
- // which flavor of ReduceTask, uber or regular?
- out.writeBoolean(task.isUberTask());
- }
- task.write(out);
- }
+ @Override
public void readFields(DataInput in) throws IOException {
boolean isMapTask = in.readBoolean();
if (isMapTask) {
- task = new MapTask();
+ this.task = new TTMapTask(new MapTask());
} else {
boolean isUberTask = in.readBoolean();
if (isUberTask) {
- task = new UberTask();
+ task = new TTUberTask(new UberTask());
} else {
- task = new ReduceTask();
+ task = new TTReduceTask(new ReduceTask());
}
}
- task.readFields(in);
+ task.getTask().readFields(in);
}
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Task task = this.task.getTask();
+ out.writeBoolean(task.isMapTask());
+ if (!task.isMapTask()) {
+ // which flavor of ReduceTask, uber or regular?
+ out.writeBoolean(task.isUberTask());
+ }
+ task.write(out);
+ }
+
+
}
Added: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalClientFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalClientFactory.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalClientFactory.java (added)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/LocalClientFactory.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,20 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.ClientFactory;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+
+/**
+ * Factory responsible for local job runner clients.
+ *
+ */
+public class LocalClientFactory extends ClientFactory {
+
+ @Override
+ protected ClientProtocol createClient(Configuration conf)
+ throws IOException {
+ return new LocalJobRunner(conf);
+ }
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTMapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTMapTask.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTMapTask.java (added)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTMapTask.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+public class TTMapTask extends TTTask {
+
+ public TTMapTask(MapTask mapTask) {
+ super(mapTask);
+ }
+
+ @Override
+ public TaskRunner createRunner(TaskTracker tracker,
+ TaskTracker.TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException {
+ return new MapTaskRunner(tip, tracker, task.conf, rjob);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTReduceTask.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTReduceTask.java (added)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTReduceTask.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,20 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
+public class TTReduceTask extends TTTask {
+
+ public TTReduceTask(ReduceTask reduceTask) {
+ super(reduceTask);
+ }
+
+ @Override
+ public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException {
+ return new ReduceTaskRunner(tip, tracker, task.conf, rjob);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTTask.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTTask.java (added)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTTask.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,62 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public abstract class TTTask implements Writable {
+
+ protected Task task;
+
+ public TTTask(Task task) {
+ this.task = task;
+ }
+
+ /**
+ * Return the task to be run
+ * @return task to be run
+ */
+ public Task getTask() {
+ return task;
+ }
+
+ /**
+ * Return an approprate thread runner for this task.
+ * @param tracker
+ * @param tip
+ * @param rjob
+ * @return
+ * @throws IOException
+ */
+ public abstract TaskRunner createRunner(TaskTracker tracker,
+ TaskTracker.TaskInProgress tip,
+ TaskTracker.RunningJob rjob
+ ) throws IOException;
+
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(task.isMapTask());
+ if (!task.isMapTask()) {
+ // which flavor of ReduceTask, uber or regular?
+ out.writeBoolean(task.isUberTask());
+ }
+ task.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ boolean isMapTask = in.readBoolean();
+ if (isMapTask) {
+ task = new MapTask();
+ } else {
+ boolean isUberTask = in.readBoolean();
+ if (isUberTask) {
+ task = new UberTask();
+ } else {
+ task = new ReduceTask();
+ }
+ }
+ task.readFields(in);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTUberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTUberTask.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTUberTask.java (added)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TTUberTask.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
+
+public class TTUberTask extends TTTask {
+
+ public TTUberTask(UberTask task) {
+ super(task);
+ }
+
+ @Override
+ public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+ TaskTracker.RunningJob rjob) throws IOException {
+ return new UberTaskRunner(tip, tracker, task.conf, rjob);
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu Mar 17 20:21:13 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.util.HostUtil;
import org.apache.hadoop.net.Node;
@@ -810,7 +811,7 @@ class TaskInProgress {
if (status != null) {
trackerName = status.getTaskTracker();
trackerHostName =
- JobInProgress.convertTrackerNameToHostName(trackerName);
+ HostUtil.convertTrackerNameToHostName(trackerName);
// Check if the user manually KILLED/FAILED this task-attempt...
Boolean shouldFail = tasksToKill.remove(taskid);
if (shouldFail != null) {
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Thu Mar 17 20:21:13 2011
@@ -53,19 +53,6 @@ public class TaskLogServlet extends Http
return f.canRead();
}
- /**
- * Construct the taskLogUrl
- * @param taskTrackerHostName
- * @param httpPort
- * @param taskAttemptID
- * @return the taskLogUrl
- */
- public static String getTaskLogUrl(String taskTrackerHostName,
- String httpPort, String taskAttemptID) {
- return ("http://" + taskTrackerHostName + ":" + httpPort
- + "/tasklog?attemptid=" + taskAttemptID);
- }
-
private void printTaskLog(HttpServletResponse response,
OutputStream out, TaskAttemptID taskId,
long start, long end, boolean plainText,
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Mar 17 20:21:13 2011
@@ -67,9 +67,6 @@ abstract class TaskRunner extends Thread
private boolean exitCodeSet = false;
private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
- static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
-
-
private TaskTracker tracker;
private TaskDistributedCacheManager taskDistributedCacheManager;
private String[] localdirs;
@@ -522,7 +519,7 @@ abstract class TaskRunner extends Thread
ldLibraryPath.append(oldLdLibraryPath);
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
- env.put(HADOOP_WORK_DIR, workDir.toString());
+ env.put(Constants.HADOOP_WORK_DIR, workDir.toString());
// put jobTokenFile name into env
String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskScheduler.java?rev=1082677&r1=1082676&r2=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskScheduler.java Thu Mar 17 20:21:13 2011
@@ -93,52 +93,6 @@ abstract class TaskScheduler implements
public abstract Collection<JobInProgress> getJobs(String queueName);
/**
- * Abstract QueueRefresher class. Scheduler's can extend this and return an
- * instance of this in the {@link #getQueueRefresher()} method. The
- * {@link #refreshQueues(List)} method of this instance will be invoked by the
- * {@link QueueManager} whenever it gets a request from an administrator to
- * refresh its own queue-configuration. This method has a documented contract
- * between the {@link QueueManager} and the {@link TaskScheduler}.
- *
- * Before calling QueueRefresher, the caller must hold the lock to the
- * corresponding {@link TaskScheduler} (generally in the {@link JobTracker}).
- */
- abstract class QueueRefresher {
-
- /**
- * Refresh the queue-configuration in the scheduler. This method has the
- * following contract.
- * <ol>
- * <li>Before this method, {@link QueueManager} does a validation of the new
- * queue-configuration. For e.g, currently addition of new queues, or
- * removal of queues at any level in the hierarchy is not supported by
- * {@link QueueManager} and so are not supported for schedulers too.</li>
- * <li>Schedulers will be passed a list of {@link JobQueueInfo}s of the root
- * queues i.e. the queues at the top level. All the descendants are properly
- * linked from these top-level queues.</li>
- * <li>Schedulers should use the scheduler specific queue properties from
- * the newRootQueues, validate the properties themselves and apply them
- * internally.</li>
- * <li>
- * Once the method returns successfully from the schedulers, it is assumed
- * that the refresh of queue properties is successful throughout and will be
- * 'committed' internally to {@link QueueManager} too. It is guaranteed that
- * at no point, after successful return from the scheduler, is the queue
- * refresh in QueueManager failed. If ever, such abnormalities happen, the
- * queue framework will be inconsistent and will need a JT restart.</li>
- * <li>If scheduler throws an exception during {@link #refreshQueues()},
- * {@link QueueManager} throws away the newly read configuration, retains
- * the old (consistent) configuration and informs the request issuer about
- * the error appropriately.</li>
- * </ol>
- *
- * @param newRootQueues
- */
- abstract void refreshQueues(List<JobQueueInfo> newRootQueues)
- throws Throwable;
- }
-
- /**
* Get the {@link QueueRefresher} for this scheduler. By default, no
* {@link QueueRefresher} exists for a scheduler and is set to null.
* Schedulers need to return an instance of {@link QueueRefresher} if they