You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/12/21 18:36:48 UTC
svn commit: r892893 [2/3] - in /hadoop/mapreduce/trunk: ./
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/ src...
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+/**
+ * A utility to manage job submission files.
+ */
+@InterfaceAudience.Private
+public class JobSubmissionFiles {
+
+ // job submission directory is private!
+ final public static FsPermission JOB_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+ //job files are world-wide readable and owner writable
+ final public static FsPermission JOB_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ public static Path getJobSplitFile(Path jobSubmissionDir) {
+ return new Path(jobSubmissionDir, "job.split");
+ }
+
+ public static Path getJobSplitMetaFile(Path jobSubmissionDir) {
+ return new Path(jobSubmissionDir, "job.splitmetainfo");
+ }
+
+ /**
+ * Get the job conf path.
+ */
+ public static Path getJobConfPath(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "job.xml");
+ }
+
+ /**
+ * Get the job jar path.
+ */
+ public static Path getJobJar(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "job.jar");
+ }
+
+ /**
+ * Get the job distributed cache files path.
+ * @param jobSubmitDir
+ */
+ public static Path getJobDistCacheFiles(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "files");
+ }
+ /**
+ * Get the job distributed cache archives path.
+ * @param jobSubmitDir
+ */
+ public static Path getJobDistCacheArchives(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "archives");
+ }
+ /**
+ * Get the job distributed cache libjars path.
+ * @param jobSubmitDir
+ */
+ public static Path getJobDistCacheLibjars(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "libjars");
+ }
+
+ /**
+ * Initializes the staging directory and returns the path. It also
+ * keeps track of all necessary ownership & permissions
+ * @param cluster
+ * @param conf
+ */
+ public static Path getStagingDir(Cluster cluster, Configuration conf)
+ throws IOException, InterruptedException {
+ Path stagingArea = cluster.getStagingAreaDir();
+ FileSystem fs = stagingArea.getFileSystem(conf);
+ String realUser;
+ String currentUser;
+ try {
+ UserGroupInformation ugi = UnixUserGroupInformation.login();
+ realUser = ugi.getUserName();
+ ugi = UnixUserGroupInformation.login(conf);
+ currentUser = ugi.getUserName();
+ } catch (LoginException le) {
+ throw new IOException(le);
+ }
+ if (fs.exists(stagingArea)) {
+ FileStatus fsStatus = fs.getFileStatus(stagingArea);
+ String owner = fsStatus.getOwner();
+ if (!(owner.equals(currentUser) || owner.equals(realUser)) ||
+ !fsStatus.getPermission().
+ equals(JOB_DIR_PERMISSION)) {
+ throw new IOException("The ownership/permissions on the staging " +
+ "directory " + stagingArea + " is not as expected. " +
+ "It is owned by " + owner + " and permissions are "+
+ fsStatus.getPermission() + ". The directory must " +
+ "be owned by the submitter " + currentUser + " or " +
+ "by " + realUser + " and permissions must be rwx------");
+ }
+ } else {
+ fs.mkdirs(stagingArea,
+ new FsPermission(JOB_DIR_PERMISSION));
+ }
+ return stagingArea;
+ }
+
+}
\ No newline at end of file
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobSubmitter.java Mon Dec 21 17:36:44 2009
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.mapreduce;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
@@ -35,14 +34,11 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.util.ReflectionUtils;
class JobSubmitter {
@@ -128,12 +124,7 @@
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
-
- /*
- * set this user's id in job configuration, so later job files can be
- * accessed using this user's id
- */
- job.setUGIAndUserGroupNames();
+ String jobJar = job.getJar();
//
// Figure out what fs the JobTracker is using. Copy the
@@ -145,14 +136,18 @@
// Create a number of filenames in the JobTracker's fs namespace
LOG.debug("default FileSystem: " + jtFs.getUri());
- jtFs.delete(submitJobDir, true);
+ if (jtFs.exists(submitJobDir)) {
+ throw new IOException("Not submitting job. Job directory " + submitJobDir
+ +" already exists!! This is unexpected.Please check what's there in" +
+ " that directory");
+ }
submitJobDir = jtFs.makeQualified(submitJobDir);
submitJobDir = new Path(submitJobDir.toUri().getPath());
- FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
+ FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
- Path filesDir = new Path(submitJobDir, "files");
- Path archivesDir = new Path(submitJobDir, "archives");
- Path libjarsDir = new Path(submitJobDir, "libjars");
+ Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
+ Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
+ Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
// add all the command line files/ jars and archive
// first copy them to jobtrackers filesystem
@@ -185,7 +180,8 @@
for (String tmpjars: libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
- DistributedCache.addFileToClassPath(newPath, conf);
+ DistributedCache.addFileToClassPath(
+ new Path(newPath.toUri().getPath()), conf);
}
}
@@ -212,11 +208,24 @@
DistributedCache.createSymlink(conf);
}
}
-
+
+ if (jobJar != null) { // copy jar to JobTracker's fs
+ // use jar name if job is not named.
+ if ("".equals(job.getJobName())){
+ job.setJobName(new Path(jobJar).getName());
+ }
+ copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir),
+ replication);
+ job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+ } else {
+ LOG.warn("No job jar file set. User classes may not be found. "+
+ "See Job or Job#setJar(String).");
+ }
+
// set the timestamps of the archives and files
TrackerDistributedCacheManager.determineTimestamps(conf);
}
-
+
private URI getPathURI(Path destPath, String fragment)
throws URISyntaxException {
URI pathURI = destPath.toUri();
@@ -234,36 +243,20 @@
short replication) throws IOException {
jtFs.copyFromLocalFile(originalJarPath, submitJarFile);
jtFs.setReplication(submitJarFile, replication);
- jtFs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
+ jtFs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
}
+
/**
* configure the jobconf of the user with the command line options of
* -libjars, -files, -archives.
* @param conf
* @throws IOException
*/
- private void configureCommandLineOptions(Job job, Path submitJobDir,
- Path submitJarFile) throws IOException {
+ private void copyAndConfigureFiles(Job job, Path jobSubmitDir)
+ throws IOException {
Configuration conf = job.getConfiguration();
short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
- copyAndConfigureFiles(job, submitJobDir, replication);
-
- /* set this user's id in job configuration, so later job files can be
- * accessed using this user's id
- */
- String originalJarPath = job.getJar();
-
- if (originalJarPath != null) { // copy jar to JobTracker's fs
- // use jar name if job is not named.
- if ("".equals(job.getJobName())){
- job.setJobName(new Path(originalJarPath).getName());
- }
- job.setJar(submitJarFile.toString());
- copyJar(new Path(originalJarPath), submitJarFile, replication);
- } else {
- LOG.warn("No job jar file set. User classes may not be found. "+
- "See Job or Job#setJar(String).");
- }
+ copyAndConfigureFiles(job, jobSubmitDir, replication);
// Set the working directory
if (job.getWorkingDirectory() == null) {
@@ -271,15 +264,6 @@
}
}
-
- // job files are world-wide readable and owner writable
- final private static FsPermission JOB_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
-
- // job submission directory is world readable/writable/executable
- final static FsPermission JOB_DIR_PERMISSION =
- FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
-
/**
* Internal method for submitting jobs to the system.
*
@@ -305,45 +289,60 @@
* </li>
* </ol></p>
* @param job the configuration to submit
+ * @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
- JobStatus submitJobInternal(Job job) throws ClassNotFoundException,
- InterruptedException, IOException {
-
+ JobStatus submitJobInternal(Job job, Cluster cluster)
+ throws ClassNotFoundException, InterruptedException, IOException {
+ /*
+ * set this user's id in job configuration, so later job files can be
+ * accessed using this user's id
+ */
+ job.setUGIAndUserGroupNames();
+
+ Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster,
+ job.getConfiguration());
//configure the command line options correctly on the submitting dfs
Configuration conf = job.getConfiguration();
JobID jobId = submitClient.getNewJobID();
- Path submitJobDir = new Path(submitClient.getSystemDir(), jobId.toString());
- Path submitJarFile = new Path(submitJobDir, "job.jar");
- Path submitSplitFile = new Path(submitJobDir, "job.split");
- configureCommandLineOptions(job, submitJobDir, submitJarFile);
- Path submitJobFile = new Path(submitJobDir, "job.xml");
-
- checkSpecs(job);
-
- // Create the splits for the job
- LOG.info("Creating splits at " + jtFs.makeQualified(submitSplitFile));
- int maps = writeSplits(job, submitSplitFile);
- conf.set("mapred.job.split.file", submitSplitFile.toString());
- conf.setInt("mapred.map.tasks", maps);
- LOG.info("number of splits:" + maps);
-
- // Write job file to JobTracker's fs
- writeConf(conf, submitJobFile);
-
- //
- // Now, actually submit the job (using the submit name)
- //
- JobStatus status = submitClient.submitJob(jobId);
- if (status != null) {
- return status;
- } else {
- throw new IOException("Could not launch job");
+ Path submitJobDir = new Path(jobStagingArea, jobId.toString());
+ JobStatus status = null;
+ try {
+ conf.set("mapreduce.job.dir", submitJobDir.toString());
+ LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ + " as the submit dir");
+ copyAndConfigureFiles(job, submitJobDir);
+ Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
+
+ checkSpecs(job);
+
+ // Create the splits for the job
+ LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
+ int maps = writeSplits(job, submitJobDir);
+ conf.setInt("mapred.map.tasks", maps);
+ LOG.info("number of splits:" + maps);
+
+ // Write job file to submit dir
+ writeConf(conf, submitJobFile);
+ //
+ // Now, actually submit the job (using the submit name)
+ //
+ status = submitClient.submitJob(jobId, submitJobDir.toString());
+ if (status != null) {
+ return status;
+ } else {
+ throw new IOException("Could not launch job");
+ }
+ } finally {
+ if (status == null) {
+ LOG.info("Cleaning up the staging area " + submitJobDir);
+ jtFs.delete(submitJobDir, true);
+ }
}
}
-
+
private void checkSpecs(Job job) throws ClassNotFoundException,
InterruptedException, IOException {
JobConf jConf = (JobConf)job.getConfiguration();
@@ -364,7 +363,7 @@
// Write job file to JobTracker's fs
FSDataOutputStream out =
FileSystem.create(jtFs, jobFile,
- new FsPermission(JOB_FILE_PERMISSION));
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
try {
conf.writeXml(out);
} finally {
@@ -372,81 +371,42 @@
}
}
+
@SuppressWarnings("unchecked")
- private <T extends InputSplit>
- int writeNewSplits(JobContext job, Path submitSplitFile) throws IOException,
+ private <T extends InputSplit>
+ int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
-
+
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
- DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile,
- array.length);
- try {
- if (array.length != 0) {
- DataOutputBuffer buffer = new DataOutputBuffer();
- Job.RawSplit rawSplit = new Job.RawSplit();
- SerializationFactory factory = new SerializationFactory(conf);
- Serializer<T> serializer =
- factory.getSerializer((Class<T>) array[0].getClass());
- serializer.open(buffer);
- for (T split: array) {
- rawSplit.setClassName(split.getClass().getName());
- buffer.reset();
- serializer.serialize(split);
- rawSplit.setDataLength(split.getLength());
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
- rawSplit.setLocations(split.getLocations());
- rawSplit.write(out);
- }
- serializer.close();
- }
- } finally {
- out.close();
- }
+ JobSplitWriter.createSplitFiles(jobSubmitDir, conf, array);
return array.length;
}
-
- static final int CURRENT_SPLIT_FILE_VERSION = 0;
- static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
-
- private DataOutputStream writeSplitsFileHeader(Configuration conf,
- Path filename, int length) throws IOException {
- // write the splits to a file for the job tracker
- FileSystem fs = filename.getFileSystem(conf);
- FSDataOutputStream out =
- FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
- out.write(SPLIT_FILE_HEADER);
- WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
- WritableUtils.writeVInt(out, length);
- return out;
- }
-
+
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
- Path submitSplitFile) throws IOException,
+ Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
- // Create the splits for the job
- LOG.debug("Creating splits at " + jtFs.makeQualified(submitSplitFile));
int maps;
if (jConf.getUseNewMapper()) {
- maps = writeNewSplits(job, submitSplitFile);
+ maps = writeNewSplits(job, jobSubmitDir);
} else {
- maps = writeOldSplits(jConf, submitSplitFile);
+ maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
-
- // method to write splits for old api mapper.
- private int writeOldSplits(JobConf job,
- Path submitSplitFile) throws IOException {
- org.apache.hadoop.mapred.InputSplit[] splits =
+
+ //method to write splits for old api mapper.
+ private int writeOldSplits(JobConf job, Path jobSubmitDir)
+ throws IOException {
+ org.apache.hadoop.mapred.InputSplit[] splits =
job.getInputFormat().getSplits(job, job.getNumMapTasks());
// sort the splits into order based on size, so that the biggest
// go first
@@ -468,24 +428,7 @@
}
}
});
- DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile,
- splits.length);
-
- try {
- DataOutputBuffer buffer = new DataOutputBuffer();
- Job.RawSplit rawSplit = new Job.RawSplit();
- for (org.apache.hadoop.mapred.InputSplit split: splits) {
- rawSplit.setClassName(split.getClass().getName());
- buffer.reset();
- split.write(buffer);
- rawSplit.setDataLength(split.getLength());
- rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
- rawSplit.setLocations(split.getLocations());
- rawSplit.write(out);
- }
- } finally {
- out.close();
- }
+ JobSplitWriter.createSplitFiles(jobSubmitDir, job, splits);
return splits.length;
}
@@ -505,7 +448,7 @@
} catch (IOException ie) {
throw new RuntimeException("exception in compare", ie);
} catch (InterruptedException ie) {
- throw new RuntimeException("exception in compare", ie);
+ throw new RuntimeException("exception in compare", ie);
}
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskCounter.java Mon Dec 21 17:36:44 2009
@@ -24,6 +24,7 @@
MAP_OUTPUT_RECORDS,
MAP_SKIPPED_RECORDS,
MAP_OUTPUT_BYTES,
+ SPLIT_RAW_BYTES,
COMBINE_INPUT_RECORDS,
COMBINE_OUTPUT_RECORDS,
REDUCE_INPUT_GROUPS,
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java Mon Dec 21 17:36:44 2009
@@ -125,7 +125,6 @@
Deserializer deserializer = factory.getDeserializer(inputSplitClass);
deserializer.open((DataInputStream)in);
inputSplit = (InputSplit)deserializer.deserialize(inputSplit);
- deserializer.close();
}
private Class<?> readClass(DataInput in) throws IOException {
@@ -147,7 +146,6 @@
factory.getSerializer(inputSplitClass);
serializer.open((DataOutputStream)out);
serializer.serialize(inputSplit);
- serializer.close();
}
public Configuration getConf() {
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/CompositeInputSplit.java Mon Dec 21 17:36:44 2009
@@ -128,7 +128,6 @@
factory.getSerializer(s.getClass());
serializer.open((DataOutputStream)out);
serializer.serialize(s);
- serializer.close();
}
}
@@ -155,7 +154,6 @@
Deserializer deserializer = factory.getDeserializer(cls[i]);
deserializer.open((DataInputStream)in);
splits[i] = (InputSplit)deserializer.deserialize(splits[i]);
- deserializer.close();
}
} catch (ClassNotFoundException e) {
throw new IOException("Failed split init", e);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java Mon Dec 21 17:36:44 2009
@@ -20,7 +20,6 @@
import java.io.IOException;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Counters;
@@ -87,8 +86,11 @@
* Version 28: Added getJobHistoryDir() as part of MAPREDUCE-975.
* Version 29: Added reservedSlots, runningTasks and totalJobSubmissions
* to ClusterMetrics as part of MAPREDUCE-1048.
+ * Version 30: Job submission files are uploaded to a staging area under
+ * user home dir. JobTracker reads the required files from the
+ * staging area using user credentials passed via the rpc.
*/
- public static final long versionID = 29L;
+ public static final long versionID = 30L;
/**
* Allocate a name for the job.
@@ -100,9 +102,8 @@
/**
* Submit a Job for execution. Returns the latest profile for
* that job.
- * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
*/
- public JobStatus submitJob(JobID jobName)
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir)
throws IOException, InterruptedException;
/**
@@ -219,7 +220,15 @@
*
* @return the system directory where job-specific files are to be placed.
*/
- public String getSystemDir() throws IOException, InterruptedException;
+ public String getSystemDir() throws IOException, InterruptedException;
+
+ /**
+ * Get a hint from the JobTracker
+ * where job-specific files are to be placed.
+ *
+ * @return the directory where job-specific files are to be placed.
+ */
+ public String getStagingAreaDir() throws IOException, InterruptedException;
/**
* Gets the directory location of the completed job history files.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/server/jobtracker/JTConfig.java Mon Dec 21 17:36:44 2009
@@ -80,6 +80,8 @@
public static final String JT_AVG_BLACKLIST_THRESHOLD =
"mapreduce.jobtracker.blacklist.average.threshold";
public static final String JT_SYSTEM_DIR = "mapreduce.jobtracker.system.dir";
+ public static final String JT_STAGING_AREA_ROOT =
+ "mapreduce.jobtracker.staging.root.dir";
public static final String JT_MAX_TRACKER_BLACKLISTS =
"mapreduce.jobtracker.tasktracker.maxblacklists";
public static final String JT_JOBHISTORY_MAXAGE =
@@ -88,4 +90,6 @@
"mapreduce.jobtracker.maxmapmemory.mb";
public static final String JT_MAX_REDUCEMEMORY_MB =
"mapreduce.jobtracker.maxreducememory.mb";
+ public static final String MAX_JOB_SPLIT_METAINFO_SIZE =
+ "mapreduce.job.split.metainfo.maxsize";
}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplit.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This class groups the fundamental classes associated with
+ * reading/writing splits. The split information is divided into
+ * two parts based on the consumer of the information. The two
+ * parts are the split meta information, and the raw split
+ * information. The first part is consumed by the JobTracker to
+ * create the tasks' locality data structures. The second part is
+ * used by the maps at runtime to know what to do!
+ * These pieces of information are written to two separate files.
+ * The metainformation file is slurped by the JobTracker during
+ * job initialization. A map task gets the meta information during
+ * the launch and it reads the raw split bytes directly from the
+ * file.
+ */
+@InterfaceAudience.Private
+public class JobSplit {
+ static final int META_SPLIT_VERSION = 1;
+ static final byte[] META_SPLIT_FILE_HEADER;
+ static {
+ try {
+ META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
+ } catch (UnsupportedEncodingException u) {
+ throw new RuntimeException(u);
+ }
+ }
+ public static final TaskSplitMetaInfo EMPTY_TASK_SPLIT =
+ new TaskSplitMetaInfo();
+
+ /**
+ * This represents the meta information about the task split.
+ * The main fields are
+ * - start offset in actual split
+ * - data length that will be processed in this split
+ * - hosts on which this split is local
+ */
+ public static class SplitMetaInfo implements Writable {
+ private long startOffset;
+ private long inputDataLength;
+ private String[] locations;
+
+ public SplitMetaInfo() {}
+
+ public SplitMetaInfo(String[] locations, long startOffset,
+ long inputDataLength) {
+ this.locations = locations;
+ this.startOffset = startOffset;
+ this.inputDataLength = inputDataLength;
+ }
+
+ public SplitMetaInfo(InputSplit split, long startOffset) throws IOException {
+ try {
+ this.locations = split.getLocations();
+ this.inputDataLength = split.getLength();
+ this.startOffset = startOffset;
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ public String[] getLocations() {
+ return locations;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getInputDataLength() {
+ return inputDataLength;
+ }
+
+ public void setInputDataLocations(String[] locations) {
+ this.locations = locations;
+ }
+
+ public void setInputDataLength(long length) {
+ this.inputDataLength = length;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int len = WritableUtils.readVInt(in);
+ locations = new String[len];
+ for (int i = 0; i < locations.length; i++) {
+ locations[i] = Text.readString(in);
+ }
+ startOffset = WritableUtils.readVLong(in);
+ inputDataLength = WritableUtils.readVLong(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, locations.length);
+ for (int i = 0; i < locations.length; i++) {
+ Text.writeString(out, locations[i]);
+ }
+ WritableUtils.writeVLong(out, startOffset);
+ WritableUtils.writeVLong(out, inputDataLength);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("data-size : " + inputDataLength + "\n");
+ buf.append("start-offset : " + startOffset + "\n");
+ buf.append("locations : " + "\n");
+ for (String loc : locations) {
+ buf.append(" " + loc + "\n");
+ }
+ return buf.toString();
+ }
+ }
+ /**
+ * This represents the meta information about the task split that the
+ * JobTracker creates
+ */
+ public static class TaskSplitMetaInfo {
+ private TaskSplitIndex splitIndex;
+ private long inputDataLength;
+ private String[] locations;
+ public TaskSplitMetaInfo(){
+ this.splitIndex = new TaskSplitIndex();
+ this.locations = new String[0];
+ }
+ public TaskSplitMetaInfo(TaskSplitIndex splitIndex, String[] locations,
+ long inputDataLength) {
+ this.splitIndex = splitIndex;
+ this.locations = locations;
+ this.inputDataLength = inputDataLength;
+ }
+ public TaskSplitMetaInfo(InputSplit split, long startOffset)
+ throws InterruptedException, IOException {
+ this(new TaskSplitIndex("", startOffset), split.getLocations(),
+ split.getLength());
+ }
+
+ public TaskSplitMetaInfo(String[] locations, long startOffset,
+ long inputDataLength) {
+ this(new TaskSplitIndex("",startOffset), locations, inputDataLength);
+ }
+
+ public TaskSplitIndex getSplitIndex() {
+ return splitIndex;
+ }
+
+ public String getSplitLocation() {
+ return splitIndex.getSplitLocation();
+ }
+ public long getInputDataLength() {
+ return inputDataLength;
+ }
+ public String[] getLocations() {
+ return locations;
+ }
+ public long getStartOffset() {
+ return splitIndex.getStartOffset();
+ }
+ }
+
+ /**
+ * This represents the meta information about the task split that the
+ * task gets
+ */
+ public static class TaskSplitIndex {
+ private String splitLocation;
+ private long startOffset;
+ public TaskSplitIndex(){
+ this("", 0);
+ }
+ public TaskSplitIndex(String splitLocation, long startOffset) {
+ this.splitLocation = splitLocation;
+ this.startOffset = startOffset;
+ }
+ public long getStartOffset() {
+ return startOffset;
+ }
+ public String getSplitLocation() {
+ return splitLocation;
+ }
+ public void readFields(DataInput in) throws IOException {
+ splitLocation = Text.readString(in);
+ startOffset = WritableUtils.readVLong(in);
+ }
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, splitLocation);
+ WritableUtils.writeVLong(out, startOffset);
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * The class that is used by the Job clients to write splits (both the meta
+ * and the raw bytes parts)
+ */
+public class JobSplitWriter {
+
+ private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
+ private static final byte[] SPLIT_FILE_HEADER;
+ static {
+ try {
+ SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
+ } catch (UnsupportedEncodingException u) {
+ throw new RuntimeException(u);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
+ Configuration conf, List<InputSplit> splits)
+ throws IOException, InterruptedException {
+ T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
+ createSplitFiles(jobSubmitDir, conf, array);
+ }
+
+ public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
+ Configuration conf,T[] splits)
+ throws IOException, InterruptedException {
+ FileSystem fs = jobSubmitDir.getFileSystem(conf);
+ FSDataOutputStream out = createFile(fs,
+ JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+ SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
+ out.close();
+ writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
+ info);
+ }
+
+ public static void createSplitFiles(Path jobSubmitDir,
+ Configuration conf, org.apache.hadoop.mapred.InputSplit[] splits)
+ throws IOException {
+ FileSystem fs = jobSubmitDir.getFileSystem(conf);
+ FSDataOutputStream out = createFile(fs,
+ JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+ SplitMetaInfo[] info = writeOldSplits(splits, out);
+ out.close();
+ writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
+ info);
+ }
+
+ private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,
+ Configuration job) throws IOException {
+ FSDataOutputStream out = FileSystem.create(fs, splitFile,
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+ int replication = job.getInt(Job.SUBMIT_REPLICATION, 10);
+ fs.setReplication(splitFile, (short)replication);
+ writeSplitHeader(out);
+ return out;
+ }
+ private static void writeSplitHeader(FSDataOutputStream out)
+ throws IOException {
+ out.write(SPLIT_FILE_HEADER);
+ out.writeInt(splitVersion);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T extends InputSplit>
+ SplitMetaInfo[] writeNewSplits(Configuration conf,
+ T[] array, FSDataOutputStream out)
+ throws IOException, InterruptedException {
+
+ SplitMetaInfo[] info = new SplitMetaInfo[array.length];
+ if (array.length != 0) {
+ SerializationFactory factory = new SerializationFactory(conf);
+ int i = 0;
+ long offset = out.size();
+ for(T split: array) {
+ int prevCount = out.size();
+ Text.writeString(out, split.getClass().getName());
+ Serializer<T> serializer =
+ factory.getSerializer((Class<T>) split.getClass());
+ serializer.open(out);
+ serializer.serialize(split);
+ int currCount = out.size();
+ info[i++] =
+ new JobSplit.SplitMetaInfo(
+ split.getLocations(), offset,
+ split.getLength());
+ offset += currCount - prevCount;
+ }
+ }
+ return info;
+ }
+
+ private static SplitMetaInfo[] writeOldSplits(
+ org.apache.hadoop.mapred.InputSplit[] splits,
+ FSDataOutputStream out) throws IOException {
+ SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
+ if (splits.length != 0) {
+ int i = 0;
+ long offset = out.size();
+ for(org.apache.hadoop.mapred.InputSplit split: splits) {
+ int prevLen = out.size();
+ Text.writeString(out, split.getClass().getName());
+ split.write(out);
+ int currLen = out.size();
+ info[i++] = new JobSplit.SplitMetaInfo(
+ split.getLocations(), offset,
+ split.getLength());
+ offset += currLen - prevLen;
+ }
+ }
+ return info;
+ }
+
+ private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
+ FsPermission p, int splitMetaInfoVersion,
+ JobSplit.SplitMetaInfo[] allSplitMetaInfo)
+ throws IOException {
+ // write the splits meta-info to a file for the job tracker
+ FSDataOutputStream out =
+ FileSystem.create(fs, filename, p);
+ out.write(JobSplit.META_SPLIT_FILE_HEADER);
+ WritableUtils.writeVInt(out, splitMetaInfoVersion);
+ WritableUtils.writeVInt(out, allSplitMetaInfo.length);
+ for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
+ splitMetaInfo.write(out);
+ }
+ out.close();
+ }
+}
+
Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=892893&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Mon Dec 21 17:36:44 2009
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * A utility that reads the split meta info and creates
+ * split meta info objects
+ */
+
+public class SplitMetaInfoReader {
+
+ public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
+ JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir)
+ throws IOException {
+ long maxMetaInfoSize = conf.getLong(JTConfig.MAX_JOB_SPLIT_METAINFO_SIZE,
+ 10000000L);
+ Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
+ FileStatus fStatus = fs.getFileStatus(metaSplitFile);
+ if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
+ throw new IOException("Split metadata size exceeded " +
+ maxMetaInfoSize +". Aborting job " + jobId);
+ }
+ FSDataInputStream in = fs.open(metaSplitFile);
+ byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
+ in.readFully(header);
+ if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
+ throw new IOException("Invalid header on split file");
+ }
+ int vers = WritableUtils.readVInt(in);
+ if (vers != JobSplit.META_SPLIT_VERSION) {
+ in.close();
+ throw new IOException("Unsupported split version " + vers);
+ }
+ int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
+ JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo =
+ new JobSplit.TaskSplitMetaInfo[numSplits];
+ for (int i = 0; i < numSplits; i++) {
+ JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
+ splitMetaInfo.readFields(in);
+ JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
+ JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(),
+ splitMetaInfo.getStartOffset());
+ allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
+ splitMetaInfo.getLocations(),
+ splitMetaInfo.getInputDataLength());
+ }
+ in.close();
+ return allSplitMetaInfo;
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Mon Dec 21 17:36:44 2009
@@ -185,8 +185,6 @@
new String[] {JobContext.QUEUE_NAME});
Configuration.addDeprecation("mapred.job.reuse.jvm.num.tasks",
new String[] {JobContext.JVM_NUMTASKS_TORUN});
- Configuration.addDeprecation("mapred.job.split.file",
- new String[] {JobContext.SPLIT_FILE});
Configuration.addDeprecation("mapred.map.tasks",
new String[] {JobContext.NUM_MAPS});
Configuration.addDeprecation("mapred.max.tracker.failures",
Modified: hadoop/mapreduce/trunk/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred-site.xml?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred-site.xml (original)
+++ hadoop/mapreduce/trunk/src/test/mapred-site.xml Mon Dec 21 17:36:44 2009
@@ -19,4 +19,9 @@
<value>false</value>
<description></description>
</property>
+<property>
+ <name>mapreduce.jobtracker.staging.root.dir</name>
+ <value>${hadoop.tmp.dir}/staging</value>
+ <description></description>
+</property>
</configuration>
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Mon Dec 21 17:36:44 2009
@@ -28,8 +28,10 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -142,10 +144,10 @@
String[] splits = ugi.split(",");
taskControllerUser = new UnixUserGroupInformation(splits);
clusterConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
- createHomeDirectory(clusterConf);
+ createHomeAndStagingDirectory(clusterConf);
}
- private void createHomeDirectory(JobConf conf)
+ private void createHomeAndStagingDirectory(JobConf conf)
throws IOException {
FileSystem fs = dfsCluster.getFileSystem();
String path = "/user/" + taskControllerUser.getUserName();
@@ -153,6 +155,10 @@
LOG.info("Creating Home directory : " + homeDirectory);
fs.mkdirs(homeDirectory);
changePermission(conf, homeDirectory);
+ Path stagingArea = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT));
+ LOG.info("Creating Staging root directory : " + stagingArea);
+ fs.mkdirs(stagingArea);
+ fs.setPermission(stagingArea, new FsPermission((short)0777));
}
private void changePermission(JobConf conf, Path p)
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Mon Dec 21 17:36:44 2009
@@ -29,11 +29,11 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskStatus.Phase;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
/**
* Utilities used in unit test.
@@ -75,7 +75,6 @@
}
static class FakeJobInProgress extends JobInProgress {
- Job.RawSplit[] rawSplits;
@SuppressWarnings("deprecation")
FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
@@ -89,27 +88,27 @@
@Override
public synchronized void initTasks() throws IOException {
- Job.RawSplit[] splits = createSplits();
- numMapTasks = splits.length;
- createMapTasks(null, splits);
- nonRunningMapCache = createCache(splits, maxLevel);
+ TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+ numMapTasks = taskSplitMetaInfo.length;
+ createMapTasks(null, taskSplitMetaInfo);
+ nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
createReduceTasks(null);
tasksInited.set(true);
this.status.setRunState(JobStatus.RUNNING);
}
@Override
- Job.RawSplit[] createSplits(){
- Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
+ TaskSplitMetaInfo [] createSplits(org.apache.hadoop.mapreduce.JobID jobId){
+ TaskSplitMetaInfo[] splits =
+ new TaskSplitMetaInfo[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
- splits[i] = new Job.RawSplit();
- splits[i].setLocations(new String[0]);
+ splits[i] = JobSplit.EMPTY_TASK_SPLIT;
}
return splits;
}
@Override
- protected void createMapTasks(String ignored, Job.RawSplit[] splits) {
+ protected void createMapTasks(String ignored, TaskSplitMetaInfo[] splits) {
maps = new TaskInProgress[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
maps[i] = new TaskInProgress(getJobID(), "test",
@@ -260,7 +259,7 @@
numSlotsRequired);
}
- public FakeTaskInProgress(JobID jobId, String jobFile, RawSplit emptySplit,
+ public FakeTaskInProgress(JobID jobId, String jobFile, TaskSplitMetaInfo emptySplit,
JobTracker jobTracker, JobConf jobConf,
JobInProgress job, int partition, int numSlotsRequired) {
super(jobId, jobFile, emptySplit, jobTracker, jobConf, job,
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java Mon Dec 21 17:36:44 2009
@@ -141,16 +141,16 @@
org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
null)) {
// specified IndirectInputFormat? Build src list
- JobClient jClient = new JobClient(job);
- Path sysdir = jClient.getSystemDir();
+ JobClient jClient = new JobClient(job);
+ Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
Random r = new Random();
- Path indirInputFile = new Path(sysdir,
+ Path indirInputFile = new Path(tmpDir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
job.set(
org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
- sysdir.getFileSystem(job), job, indirInputFile,
+ tmpDir.getFileSystem(job), job, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobInProgress.java Mon Dec 21 17:36:44 2009
@@ -40,10 +40,9 @@
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.StaticMapping;
@@ -101,16 +100,14 @@
}
@Override
- Job.RawSplit[] createSplits() {
+ TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) {
// Set all splits to reside on one host. This will ensure that
// one tracker gets data local, one gets rack local and two others
// get non-local maps
- Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
+ TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMapTasks];
String[] splitHosts0 = new String[] { hosts[0] };
for (int i = 0; i < numMapTasks; i++) {
- splits[i] = new Job.RawSplit();
- splits[i].setDataLength(0);
- splits[i].setLocations(splitHosts0);
+ splits[i] = new TaskSplitMetaInfo(splitHosts0, 0, 0);
}
return splits;
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Mon Dec 21 17:36:44 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
public class TestJobQueueTaskScheduler extends TestCase {
@@ -81,7 +82,7 @@
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(TaskType.MAP);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) {
+ Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(), 1) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Mon Dec 21 17:36:44 2009
@@ -56,7 +56,8 @@
Path outDir,
String input,
int numMaps,
- int numReduces) throws IOException {
+ int numReduces,
+ String sysDir) throws IOException {
FileSystem inFs = inDir.getFileSystem(conf);
FileSystem outFs = outDir.getFileSystem(conf);
outFs.delete(outDir, true);
@@ -90,14 +91,13 @@
assertFalse(FileSystem.get(conf).exists(
new Path(conf.get(JTConfig.JT_SYSTEM_DIR))));
// Check if the Job Tracker system dir is propogated to client
- String sysDir = jobClient.getSystemDir().toString();
- System.out.println("Job sys dir -->" + sysDir);
assertFalse(sysDir.contains("/tmp/subru/mapred/system"));
assertTrue(sysDir.contains("custom"));
return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
}
- static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+ static void runWordCount(MiniMRCluster mr, JobConf jobConf, String sysDir)
+ throws IOException {
LOG.info("runWordCount");
// Run a word count example
// Keeping tasks that match this pattern
@@ -107,7 +107,7 @@
result = launchWordCount(jobConf, inDir, outDir,
"The quick brown fox\nhas many silly\n" +
"red fox sox\n",
- 3, 1);
+ 3, 1, sysDir);
assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
// Checking if the Job ran successfully in spite of different system dir config
@@ -128,7 +128,7 @@
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
- runWordCount(mr, mr.createJobConf());
+ runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir"));
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestKillCompletedJob.java Mon Dec 21 17:36:44 2009
@@ -52,7 +52,7 @@
@SuppressWarnings("deprecation")
public void testKillCompletedJob() throws IOException, InterruptedException {
job = new MyFakeJobInProgress(new JobConf(), jobTracker);
- jobTracker.addJob(job.getJobID(), job);
+ jobTracker.addJob(job.getJobID(), (JobInProgress)job);
job.status.setRunState(JobStatus.SUCCEEDED);
jobTracker.killJob(job.getJobID());
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Mon Dec 21 17:36:44 2009
@@ -20,6 +20,9 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
@@ -27,10 +30,19 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Validates map phase progress.
@@ -92,9 +104,9 @@
*/
class TestMapTask extends MapTask {
public TestMapTask(String jobFile, TaskAttemptID taskId,
- int partition, String splitClass, BytesWritable split,
+ int partition, TaskSplitIndex splitIndex,
int numSlotsRequired) {
- super(jobFile, taskId, partition, splitClass, split, numSlotsRequired);
+ super(jobFile, taskId, partition, splitIndex, numSlotsRequired);
}
/**
@@ -141,16 +153,20 @@
jobId = taskId.getJobID();
JobContext jContext = new JobContextImpl(job, jobId);
- Job.RawSplit[] rawSplits = LocalJobRunner.getRawSplits(jContext, job);
+ InputFormat<?, ?> input =
+ ReflectionUtils.newInstance(jContext.getInputFormatClass(), job);
- job.setUseNewMapper(true); // use new api
- for (int i = 0; i < rawSplits.length; i++) {// rawSplits.length is 1
+ List<InputSplit> splits = input.getSplits(jContext);
+ JobSplitWriter.createSplitFiles(new Path(TEST_ROOT_DIR), job, splits);
+ TaskSplitMetaInfo[] splitMetaInfo =
+ SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, job, new Path(TEST_ROOT_DIR));
+ job.setUseNewMapper(true); // use new api
+ for (int i = 0; i < splitMetaInfo.length; i++) {// rawSplits.length is 1
map = new TestMapTask(
job.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system") +
jobId + "job.xml",
taskId, i,
- rawSplits[i].getClassName(),
- rawSplits[i].getBytes(), 1);
+ splitMetaInfo[i].getSplitIndex(), 1);
JobConf localConf = new JobConf(job);
map.localizeConfiguration(localConf);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRClasspath.java Mon Dec 21 17:36:44 2009
@@ -38,15 +38,13 @@
public class TestMiniMRClasspath extends TestCase {
- static String launchWordCount(URI fileSys,
+ static void configureWordCount(FileSystem fs,
String jobTracker,
JobConf conf,
String input,
int numMaps,
- int numReduces) throws IOException {
- final Path inDir = new Path("/testing/wc/input");
- final Path outDir = new Path("/testing/wc/output");
- FileSystem fs = FileSystem.get(fileSys, conf);
+ int numReduces,
+ Path inDir, Path outDir) throws IOException {
fs.delete(outDir, true);
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
@@ -56,7 +54,7 @@
file.writeBytes(input);
file.close();
}
- FileSystem.setDefaultUri(conf, fileSys);
+ FileSystem.setDefaultUri(conf, fs.getUri());
conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
conf.setJobName("wordcount");
conf.setInputFormat(TextInputFormat.class);
@@ -75,6 +73,16 @@
conf.setNumReduceTasks(numReduces);
//pass a job.jar already included in the hadoop build
conf.setJar("build/test/mapred/testjar/testjob.jar");
+ }
+
+ static String launchWordCount(URI fileSys, String jobTracker, JobConf conf,
+ String input, int numMaps, int numReduces)
+ throws IOException {
+ final Path inDir = new Path("/testing/wc/input");
+ final Path outDir = new Path("/testing/wc/output");
+ FileSystem fs = FileSystem.get(fileSys, conf);
+ configureWordCount(fs, jobTracker, conf, input, numMaps, numReduces, inDir,
+ outDir);
JobClient.runJob(conf);
StringBuffer result = new StringBuffer();
{
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Dec 21 17:36:44 2009
@@ -36,12 +36,14 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -289,8 +291,11 @@
long hdfsWrite =
counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+ long rawSplitBytesRead =
+ counters.findCounter(TaskCounter.SPLIT_RAW_BYTES).getCounter();
assertEquals(result.output.length(), hdfsWrite);
- assertEquals(input.length(), hdfsRead);
+ // add the correction factor of 234 as the input split is also streamed
+ assertEquals(input.length() + rawSplitBytesRead, hdfsRead);
// Run a job with input and output going to localfs even though the
// default fs is hdfs.
@@ -334,7 +339,7 @@
}
}
- public void testWithDFSWithDefaultPort() throws IOException {
+ public void tesWithDFSWithDefaultPort() throws IOException {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Mon Dec 21 17:36:44 2009
@@ -23,9 +23,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.security.*;
/**
@@ -43,7 +50,11 @@
}
static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {
- JobConf jobconf = mr.createJobConf();
+ return createJobConf(mr.createJobConf(), ugi);
+ }
+
+ static JobConf createJobConf(JobConf conf, UnixUserGroupInformation ugi) {
+ JobConf jobconf = new JobConf(conf);
UnixUserGroupInformation.saveToConf(jobconf,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
return jobconf;
@@ -55,6 +66,50 @@
fs.setPermission(p, new FsPermission((short)0777));
}
+ // runs a sample job as a user (ugi)
+ RunningJob runJobAsUser(JobConf job, UserGroupInformation ugi)
+ throws Exception {
+ ClientProtocol jobSubmitClient =
+ TestSubmitJob.getJobSubmitClient(job, ugi);
+ org.apache.hadoop.mapreduce.JobID id = jobSubmitClient.getNewJobID();
+
+ InputSplit[] splits = computeJobSplit(JobID.downgrade(id), job);
+ Path jobSubmitDir = new Path(id.toString());
+ FileSystem fs = jobSubmitDir.getFileSystem(job);
+ jobSubmitDir = jobSubmitDir.makeQualified(fs);
+ uploadJobFiles(JobID.downgrade(id), splits, jobSubmitDir, job);
+
+ jobSubmitClient.submitJob(id, jobSubmitDir.toString());
+
+ JobClient jc = new JobClient(job);
+ return jc.getJob(JobID.downgrade(id));
+ }
+
+ // a helper api for split computation
+ private InputSplit[] computeJobSplit(JobID id, JobConf conf)
+ throws IOException {
+ InputSplit[] splits =
+ conf.getInputFormat().getSplits(conf, conf.getNumMapTasks());
+ conf.setNumMapTasks(splits.length);
+ return splits;
+ }
+
+
+ // a helper api for split submission
+ private void uploadJobFiles(JobID id, InputSplit[] splits,
+ Path jobSubmitDir, JobConf conf)
+ throws IOException {
+ Path confLocation = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+ JobSplitWriter.createSplitFiles(jobSubmitDir, conf, splits);
+ FileSystem fs = confLocation.getFileSystem(conf);
+ FsPermission perm = new FsPermission((short)0700);
+
+ // localize conf
+ DataOutputStream confOut = FileSystem.create(fs, confLocation, perm);
+ conf.writeXml(confOut);
+ confOut.close();
+ }
+
public void testDistinctUsers() throws Exception {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
@@ -71,15 +126,32 @@
UnixUserGroupInformation.login().getUserName(), false);
mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
1, null, null, MR_UGI);
+ String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
- JobConf pi = createJobConf(mr, PI_UGI);
- TestMiniMRWithDFS.runPI(mr, pi);
-
- JobConf wc = createJobConf(mr, WC_UGI);
- TestMiniMRWithDFS.runWordCount(mr, wc);
+ JobConf job1 = mr.createJobConf();
+ String input = "The quick brown fox\nhas many silly\n"
+ + "red fox sox\n";
+ Path inDir = new Path("/testing/distinct/input");
+ Path outDir = new Path("/testing/distinct/output");
+ TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1,
+ input, 2, 1, inDir, outDir);
+ job1 = createJobConf(job1, PI_UGI);
+ runJobAsUser(job1, PI_UGI);
+
+ JobConf job2 = mr.createJobConf();
+ Path inDir2 = new Path("/testing/distinct/input2");
+ Path outDir2 = new Path("/testing/distinct/output2");
+ TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job2,
+ input, 2, 1, inDir2, outDir2);
+ job2 = createJobConf(job2, WC_UGI);
+ runJobAsUser(job2, WC_UGI);
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();}
}
}
+
+ public void testRestartWithDistinctUsers() {
+
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Mon Dec 21 17:36:44 2009
@@ -31,8 +31,10 @@
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.StaticMapping;
@@ -93,38 +95,36 @@
@Override
public void initTasks() throws IOException {
- Job.RawSplit[] splits = createSplits();
- numMapTasks = splits.length;
- createMapTasks(null, splits);
- nonRunningMapCache = createCache(splits, maxLevel);
+ TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
+ numMapTasks = taskSplitMetaInfo.length;
+ createMapTasks(null, taskSplitMetaInfo);
+ nonRunningMapCache = createCache(taskSplitMetaInfo, maxLevel);
tasksInited.set(true);
this.status.setRunState(JobStatus.RUNNING);
}
-
- protected Job.RawSplit[] createSplits() throws IOException {
- Job.RawSplit[] splits = new Job.RawSplit[numMaps];
+ @Override
+ protected TaskSplitMetaInfo [] createSplits(
+ org.apache.hadoop.mapreduce.JobID jobId) throws IOException {
+ TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[numMaps];
// Hand code for now.
// M0,2,3 reside in Host1
// M1 resides in Host3
// M4 resides in Host4
String[] splitHosts0 = new String[] { allHosts[0] };
- for (int i = 0; i < numMaps; i++) {
- splits[i] = new Job.RawSplit();
- splits[i].setDataLength(0);
- }
-
- splits[0].setLocations(splitHosts0);
- splits[2].setLocations(splitHosts0);
- splits[3].setLocations(splitHosts0);
-
String[] splitHosts1 = new String[] { allHosts[2] };
- splits[1].setLocations(splitHosts1);
-
String[] splitHosts2 = new String[] { allHosts[3] };
- splits[4].setLocations(splitHosts2);
+ for (int i = 0; i < numMaps; i++) {
+ if (i == 0 || i == 2 || i == 3) {
+ splits[i] = new TaskSplitMetaInfo(splitHosts0, 0, 0);
+ } else if (i == 1) {
+ splits[i] = new TaskSplitMetaInfo(splitHosts1, 0, 0);
+ } else if (i == 4) {
+ splits[i] = new TaskSplitMetaInfo(splitHosts2, 0, 0);
+ }
+ }
return splits;
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Mon Dec 21 17:36:44 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
+import java.io.File;
import java.io.IOException;
import junit.framework.TestCase;
@@ -48,10 +49,11 @@
/**
* Tests the {@link JobTracker} against the exceptions thrown in
* {@link JobTracker.RecoveryManager}. It does the following :
- * - submits 2 jobs
+ * - submits 3 jobs
* - kills the jobtracker
* - Garble job.xml for one job causing it to fail in constructor
* and job.split for another causing it to fail in init.
+ * - delete the job temp/submit dir
* - restarts the jobtracker
* - checks if the jobtraker starts normally
*/
@@ -81,7 +83,7 @@
LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
UtilsForTests.waitFor(100);
}
-
+
JobConf job2 = mr.createJobConf();
UtilsForTests.configureWaitingJobConf(job2,
@@ -105,26 +107,15 @@
// delete the job.xml of job #1 causing the job to fail in submit Job
//while recovery itself
Path jobFile =
- new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
- LOG.info("Deleting job.xml file : " + jobFile.toString());
+ new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
+ LOG.info("Deleting job token file : " + jobFile.toString());
fs.delete(jobFile, false); // delete the job.xml file
- // create the job.xml file with 0 bytes
+ // create the job token file with 1 byte
FSDataOutputStream out = fs.create(jobFile);
out.write(1);
out.close();
-
- // delete the job.split of job #2 causing the job to fail in initTasks
- Path jobSplitFile =
- new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
- LOG.info("Deleting job.split file : " + jobSplitFile.toString());
- fs.delete(jobSplitFile, false); // delete the job.split file
- // create the job.split file with 0 bytes
- out = fs.create(jobSplitFile);
- out.write(1);
- out.close();
-
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf().setBoolean(JTConfig.JT_RESTART_ENABLED, true);
// start the jobtracker
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Mon Dec 21 17:36:44 2009
@@ -17,10 +17,9 @@
*/
package org.apache.hadoop.mapred;
-import junit.framework.TestCase;
+import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Job.RawSplit;
+import junit.framework.TestCase;
public class TestResourceEstimation extends TestCase {
@@ -47,8 +46,8 @@
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- Job.RawSplit split = new Job.RawSplit();
- split.setDataLength(0);
+ JobSplit.TaskSplitMetaInfo split =
+ new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
}
@@ -83,8 +82,9 @@
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- Job.RawSplit split = new Job.RawSplit();
- split.setDataLength(singleMapInputSize);
+ JobSplit.TaskSplitMetaInfo split =
+ new JobSplit.TaskSplitMetaInfo(new String[0], 0,
+ singleMapInputSize);
TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
}
@@ -95,8 +95,8 @@
//add one more map task with input size as 0
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- Job.RawSplit split = new Job.RawSplit();
- split.setDataLength(0);
+ JobSplit.TaskSplitMetaInfo split =
+ new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSetupTaskScheduling.java Mon Dec 21 17:36:44 2009
@@ -25,6 +25,7 @@
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeTaskInProgress;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
@@ -60,7 +61,7 @@
@Override
public synchronized void initTasks() throws IOException {
super.initTasks();
- Job.RawSplit emptySplit = new Job.RawSplit();
+ JobSplit.TaskSplitMetaInfo emptySplit = new JobSplit.TaskSplitMetaInfo();
setup = new TaskInProgress[2];
setup[0] = new TaskInProgress(getJobID(), "test", emptySplit,
jobtracker, getJobConf(), this, numMapTasks + 1, 1);
@@ -109,12 +110,13 @@
@Override
public synchronized void initTasks() throws IOException {
super.initTasks();
- Job.RawSplit emptySplit = new Job.RawSplit();
+
final int numSlotsPerTask = 2;
maps = new TaskInProgress[1];
reduces = new TaskInProgress[1];
- maps[0] = new FakeTaskInProgress(getJobID(), "test", emptySplit,
+ maps[0] = new FakeTaskInProgress(getJobID(), "test",
+ JobSplit.EMPTY_TASK_SPLIT,
jobtracker, getJobConf(), this, 0, numSlotsPerTask);
TaskAttemptID attemptId = new TaskAttemptID(maps[0].getTIPId(), 0);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java?rev=892893&r1=892892&r2=892893&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSeveral.java Mon Dec 21 17:36:44 2009
@@ -92,6 +92,8 @@
TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
+ TestMiniMRWithDFSWithDistinctUsers.mkdir(fs,
+ conf.get(JTConfig.JT_STAGING_AREA_ROOT));
UnixUserGroupInformation MR_UGI =
TestMiniMRWithDFSWithDistinctUsers.createUGI(