You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:41:32 UTC
svn commit: r1077108 [2/3] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
contrib/gridmix/src/test/org/apache/hadoop/...
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar 4 03:41:31 2011
@@ -30,9 +30,8 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.net.Node;
@@ -61,7 +60,7 @@ class TaskInProgress {
// Defines the TIP
private String jobFile = null;
- private RawSplit rawSplit;
+ private final TaskSplitMetaInfo splitInfo;
private int numMaps;
private int partition;
private JobTracker jobtracker;
@@ -131,12 +130,12 @@ class TaskInProgress {
* Constructor for MapTask
*/
public TaskInProgress(JobID jobid, String jobFile,
- RawSplit rawSplit,
+ TaskSplitMetaInfo split,
JobTracker jobtracker, JobConf conf,
JobInProgress job, int partition,
int numSlotsRequired) {
this.jobFile = jobFile;
- this.rawSplit = rawSplit;
+ this.splitInfo = split;
this.jobtracker = jobtracker;
this.job = job;
this.conf = conf;
@@ -155,6 +154,7 @@ class TaskInProgress {
int partition, JobTracker jobtracker, JobConf conf,
JobInProgress job, int numSlotsRequired) {
this.jobFile = jobFile;
+ this.splitInfo = null;
this.numMaps = numMaps;
this.partition = partition;
this.jobtracker = jobtracker;
@@ -284,7 +284,7 @@ class TaskInProgress {
* Whether this is a map task
*/
public boolean isMapTask() {
- return rawSplit != null;
+ return splitInfo != null;
}
/**
@@ -746,7 +746,7 @@ class TaskInProgress {
*/
public String[] getSplitLocations() {
if (isMapTask() && !jobSetup && !jobCleanup) {
- return rawSplit.getLocations();
+ return splitInfo.getLocations();
}
return new String[0];
}
@@ -937,19 +937,11 @@ class TaskInProgress {
if (isMapTask()) {
LOG.debug("attempt " + numTaskFailures + " sending skippedRecords "
+ failedRanges.getIndicesCount());
- String splitClass = null;
- BytesWritable split;
- if (!jobSetup && !jobCleanup) {
- splitClass = rawSplit.getClassName();
- split = rawSplit.getBytes();
- } else {
- split = new BytesWritable();
- }
- t = new MapTask(jobFile, taskid, partition, splitClass, split,
- numSlotsNeeded, job.getUser());
+ t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(),
+ numSlotsNeeded);
} else {
t = new ReduceTask(jobFile, taskid, partition, numMaps,
- numSlotsNeeded, job.getUser());
+ numSlotsNeeded);
}
if (jobCleanup) {
t.setJobCleanupTask();
@@ -1060,7 +1052,7 @@ class TaskInProgress {
if (!isMapTask() || jobSetup || jobCleanup) {
return "";
}
- String[] splits = rawSplit.getLocations();
+ String[] splits = splitInfo.getLocations();
Node[] nodes = new Node[splits.length];
for (int i = 0; i < splits.length; i++) {
nodes[i] = jobtracker.getNode(splits[i]);
@@ -1090,16 +1082,12 @@ class TaskInProgress {
public long getMapInputSize() {
if(isMapTask() && !jobSetup && !jobCleanup) {
- return rawSplit.getDataLength();
+ return splitInfo.getInputDataLength();
} else {
return 0;
}
}
- public void clearSplit() {
- rawSplit.clearBytes();
- }
-
/**
* This class keeps the records to be skipped during further executions
* based on failed records from all the previous attempts.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:41:31 2011
@@ -84,6 +84,8 @@ import org.apache.hadoop.metrics.Updater
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ConfiguredPolicy;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -474,6 +476,13 @@ public class TaskTracker
return taskDir;
}
+ private void setUgi(String user, Configuration conf) {
+ //The dummy-group used here will not be required once we have UGI
+ //object creation with just the user name.
+ conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME,
+ user+","+UnixUserGroupInformation.DEFAULT_GROUP);
+ }
+
String getPid(TaskAttemptID tid) {
TaskInProgress tip = tasks.get(tid);
if (tip != null) {
@@ -833,12 +842,16 @@ public class TaskTracker
Task t = tip.getTask();
JobID jobId = t.getJobID();
Path jobFile = new Path(t.getJobFile());
+ String userName = t.getUser();
+ JobConf userConf = new JobConf(getJobConf());
+ setUgi(userName, userConf);
+ FileSystem userFs = jobFile.getFileSystem(userConf);
// Get sizes of JobFile and JarFile
// sizes are -1 if they are not present.
FileStatus status = null;
long jobFileSize = -1;
try {
- status = systemFS.getFileStatus(jobFile);
+ status = userFs.getFileStatus(jobFile);
jobFileSize = status.getLen();
} catch(FileNotFoundException fe) {
jobFileSize = -1;
@@ -864,7 +877,7 @@ public class TaskTracker
throw new IOException("Not able to create job directory "
+ jobDir.toString());
}
- systemFS.copyToLocalFile(jobFile, localJobFile);
+ userFs.copyToLocalFile(jobFile, localJobFile);
JobConf localJobConf = new JobConf(localJobFile);
// create the 'work' directory
@@ -885,7 +898,7 @@ public class TaskTracker
if (jarFile != null) {
Path jarFilePath = new Path(jarFile);
try {
- status = systemFS.getFileStatus(jarFilePath);
+ status = userFs.getFileStatus(jarFilePath);
jarFileSize = status.getLen();
} catch(FileNotFoundException fe) {
jarFileSize = -1;
@@ -899,7 +912,7 @@ public class TaskTracker
if (!localFs.mkdirs(localJarFile.getParent())) {
throw new IOException("Mkdirs failed to create jars directory ");
}
- systemFS.copyToLocalFile(jarFilePath, localJarFile);
+ userFs.copyToLocalFile(jarFilePath, localJarFile);
localJobConf.setJar(localJarFile.toString());
OutputStream out = localFs.create(localJobFile);
try {
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Fri Mar 4 03:41:31 2011
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.conf.Configuration;
+/**
+ * A utility to manage job submission files.
+ */
+public class JobSubmissionFiles {
+
+ // job submission directory is private!
+ final public static FsPermission JOB_DIR_PERMISSION =
+ FsPermission.createImmutable((short) 0700); // rwx--------
+ //job files are world-wide readable and owner writable
+ final public static FsPermission JOB_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ public static Path getJobSplitFile(Path jobSubmissionDir) {
+ return new Path(jobSubmissionDir, "job.split");
+ }
+
+ public static Path getJobSplitMetaFile(Path jobSubmissionDir) {
+ return new Path(jobSubmissionDir, "job.splitmetainfo");
+ }
+
+ /**
+ * Get the job conf path.
+ */
+ public static Path getJobConfPath(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "job.xml");
+ }
+
+ /**
+ * Get the job jar path.
+ */
+ public static Path getJobJar(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "job.jar");
+ }
+
+ /**
+ * Get the job distributed cache files path.
+ * @param jobSubmitDir
+ */
+ public static Path getJobDistCacheFiles(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "files");
+ }
+ /**
+ * Get the job distributed cache archives path.
+ * @param jobSubmitDir
+ */
+ public static Path getJobDistCacheArchives(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "archives");
+ }
+ /**
+ * Get the job distributed cache libjars path.
+ * @param jobSubmitDir
+ */
+ public static Path getJobDistCacheLibjars(Path jobSubmitDir) {
+ return new Path(jobSubmitDir, "libjars");
+ }
+
+ /**
+ * Initializes the staging directory and returns the path. It also
+ * keeps track of all necessary ownership & permissions
+ * @param cluster
+ * @param conf
+ */
+ public static Path getStagingDir(JobClient client, Configuration conf)
+ throws IOException {
+ Path stagingArea = client.getStagingAreaDir();
+ FileSystem fs = stagingArea.getFileSystem(conf);
+ String realUser;
+ String currentUser;
+ try {
+ UserGroupInformation ugi = UnixUserGroupInformation.login();
+ realUser = ugi.getUserName();
+ ugi = UnixUserGroupInformation.login(conf);
+ currentUser = ugi.getUserName();
+ } catch (LoginException le) {
+ throw new IOException(le);
+ }
+ if (fs.exists(stagingArea)) {
+ FileStatus fsStatus = fs.getFileStatus(stagingArea);
+ String owner = fsStatus.getOwner();
+ if (!(owner.equals(currentUser) || owner.equals(realUser)) ||
+ !fsStatus.getPermission().
+ equals(JOB_DIR_PERMISSION)) {
+ throw new IOException("The ownership/permissions on the staging " +
+ "directory " + stagingArea + " is not as expected. " +
+ "It is owned by " + owner + " and permissions are "+
+ fsStatus.getPermission() + ". The directory must " +
+ "be owned by the submitter " + currentUser + " or " +
+ "by " + realUser + " and permissions must be rwx------");
+ }
+ } else {
+ fs.mkdirs(stagingArea,
+ new FsPermission(JOB_DIR_PERMISSION));
+ }
+ return stagingArea;
+ }
+
+}
\ No newline at end of file
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplit.java Fri Mar 4 03:41:31 2011
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+
+/**
+ * This class groups the fundamental classes associated with
+ * reading/writing splits. The split information is divided into
+ * two parts based on the consumer of the information. The two
+ * parts are the split meta information, and the raw split
+ * information. The first part is consumed by the JobTracker to
+ * create the tasks' locality data structures. The second part is
+ * used by the maps at runtime to know what to do!
+ * These pieces of information are written to two separate files.
+ * The metainformation file is slurped by the JobTracker during
+ * job initialization. A map task gets the meta information during
+ * the launch and it reads the raw split bytes directly from the
+ * file.
+ */
+public class JobSplit {
+ static final int META_SPLIT_VERSION = 1;
+ static final byte[] META_SPLIT_FILE_HEADER;
+ static {
+ try {
+ META_SPLIT_FILE_HEADER = "META-SPL".getBytes("UTF-8");
+ } catch (UnsupportedEncodingException u) {
+ throw new RuntimeException(u);
+ }
+ }
+ public static final TaskSplitMetaInfo EMPTY_TASK_SPLIT =
+ new TaskSplitMetaInfo();
+
+ /**
+ * This represents the meta information about the task split.
+ * The main fields are
+ * - start offset in actual split
+ * - data length that will be processed in this split
+ * - hosts on which this split is local
+ */
+ public static class SplitMetaInfo implements Writable {
+ private long startOffset;
+ private long inputDataLength;
+ private String[] locations;
+
+ public SplitMetaInfo() {}
+
+ public SplitMetaInfo(String[] locations, long startOffset,
+ long inputDataLength) {
+ this.locations = locations;
+ this.startOffset = startOffset;
+ this.inputDataLength = inputDataLength;
+ }
+
+ public SplitMetaInfo(InputSplit split, long startOffset) throws IOException {
+ try {
+ this.locations = split.getLocations();
+ this.inputDataLength = split.getLength();
+ this.startOffset = startOffset;
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+
+ public String[] getLocations() {
+ return locations;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public long getInputDataLength() {
+ return inputDataLength;
+ }
+
+ public void setInputDataLocations(String[] locations) {
+ this.locations = locations;
+ }
+
+ public void setInputDataLength(long length) {
+ this.inputDataLength = length;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int len = WritableUtils.readVInt(in);
+ locations = new String[len];
+ for (int i = 0; i < locations.length; i++) {
+ locations[i] = Text.readString(in);
+ }
+ startOffset = WritableUtils.readVLong(in);
+ inputDataLength = WritableUtils.readVLong(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVInt(out, locations.length);
+ for (int i = 0; i < locations.length; i++) {
+ Text.writeString(out, locations[i]);
+ }
+ WritableUtils.writeVLong(out, startOffset);
+ WritableUtils.writeVLong(out, inputDataLength);
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("data-size : " + inputDataLength + "\n");
+ buf.append("start-offset : " + startOffset + "\n");
+ buf.append("locations : " + "\n");
+ for (String loc : locations) {
+ buf.append(" " + loc + "\n");
+ }
+ return buf.toString();
+ }
+ }
+ /**
+ * This represents the meta information about the task split that the
+ * JobTracker creates
+ */
+ public static class TaskSplitMetaInfo {
+ private TaskSplitIndex splitIndex;
+ private long inputDataLength;
+ private String[] locations;
+ public TaskSplitMetaInfo(){
+ this.splitIndex = new TaskSplitIndex();
+ this.locations = new String[0];
+ }
+ public TaskSplitMetaInfo(TaskSplitIndex splitIndex, String[] locations,
+ long inputDataLength) {
+ this.splitIndex = splitIndex;
+ this.locations = locations;
+ this.inputDataLength = inputDataLength;
+ }
+ public TaskSplitMetaInfo(InputSplit split, long startOffset)
+ throws InterruptedException, IOException {
+ this(new TaskSplitIndex("", startOffset), split.getLocations(),
+ split.getLength());
+ }
+
+ public TaskSplitMetaInfo(String[] locations, long startOffset,
+ long inputDataLength) {
+ this(new TaskSplitIndex("",startOffset), locations, inputDataLength);
+ }
+
+ public TaskSplitIndex getSplitIndex() {
+ return splitIndex;
+ }
+
+ public String getSplitLocation() {
+ return splitIndex.getSplitLocation();
+ }
+ public long getInputDataLength() {
+ return inputDataLength;
+ }
+ public String[] getLocations() {
+ return locations;
+ }
+ public long getStartOffset() {
+ return splitIndex.getStartOffset();
+ }
+ }
+
+ /**
+ * This represents the meta information about the task split that the
+ * task gets
+ */
+ public static class TaskSplitIndex {
+ private String splitLocation;
+ private long startOffset;
+ public TaskSplitIndex(){
+ this("", 0);
+ }
+ public TaskSplitIndex(String splitLocation, long startOffset) {
+ this.splitLocation = splitLocation;
+ this.startOffset = startOffset;
+ }
+ public long getStartOffset() {
+ return startOffset;
+ }
+ public String getSplitLocation() {
+ return splitLocation;
+ }
+ public void readFields(DataInput in) throws IOException {
+ splitLocation = Text.readString(in);
+ startOffset = WritableUtils.readVLong(in);
+ }
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, splitLocation);
+ WritableUtils.writeVLong(out, startOffset);
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Fri Mar 4 03:41:31 2011
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
+
+/**
+ * The class that is used by the Job clients to write splits (both the meta
+ * and the raw bytes parts)
+ */
+public class JobSplitWriter {
+
+ private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
+ private static final byte[] SPLIT_FILE_HEADER;
+ static {
+ try {
+ SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
+ } catch (UnsupportedEncodingException u) {
+ throw new RuntimeException(u);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
+ Configuration conf, List<InputSplit> splits)
+ throws IOException, InterruptedException {
+ T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
+ createSplitFiles(jobSubmitDir, conf, array);
+ }
+
+ public static <T extends InputSplit> void createSplitFiles(Path jobSubmitDir,
+ Configuration conf,T[] splits)
+ throws IOException, InterruptedException {
+ FileSystem fs = jobSubmitDir.getFileSystem(conf);
+ FSDataOutputStream out = createFile(fs,
+ JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+ SplitMetaInfo[] info = writeNewSplits(conf, splits, out);
+ out.close();
+ writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
+ info);
+ }
+
+ public static void createSplitFiles(Path jobSubmitDir,
+ Configuration conf, org.apache.hadoop.mapred.InputSplit[] splits)
+ throws IOException {
+ FileSystem fs = jobSubmitDir.getFileSystem(conf);
+ FSDataOutputStream out = createFile(fs,
+ JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
+ SplitMetaInfo[] info = writeOldSplits(splits, out);
+ out.close();
+ writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
+ info);
+ }
+
+ private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,
+ Configuration job) throws IOException {
+ FSDataOutputStream out = FileSystem.create(fs, splitFile,
+ new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
+ int replication = job.getInt("mapred.submit.replication", 10);
+ fs.setReplication(splitFile, (short)replication);
+ writeSplitHeader(out);
+ return out;
+ }
+ private static void writeSplitHeader(FSDataOutputStream out)
+ throws IOException {
+ out.write(SPLIT_FILE_HEADER);
+ out.writeInt(splitVersion);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T extends InputSplit>
+ SplitMetaInfo[] writeNewSplits(Configuration conf,
+ T[] array, FSDataOutputStream out)
+ throws IOException, InterruptedException {
+
+ SplitMetaInfo[] info = new SplitMetaInfo[array.length];
+ if (array.length != 0) {
+ SerializationFactory factory = new SerializationFactory(conf);
+ int i = 0;
+ long offset = out.size();
+ for(T split: array) {
+ int prevCount = out.size();
+ Text.writeString(out, split.getClass().getName());
+ Serializer<T> serializer =
+ factory.getSerializer((Class<T>) split.getClass());
+ serializer.open(out);
+ serializer.serialize(split);
+ int currCount = out.size();
+ info[i++] =
+ new JobSplit.SplitMetaInfo(
+ split.getLocations(), offset,
+ split.getLength());
+ offset += currCount - prevCount;
+ }
+ }
+ return info;
+ }
+
+ private static SplitMetaInfo[] writeOldSplits(
+ org.apache.hadoop.mapred.InputSplit[] splits,
+ FSDataOutputStream out) throws IOException {
+ SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
+ if (splits.length != 0) {
+ int i = 0;
+ long offset = out.size();
+ for(org.apache.hadoop.mapred.InputSplit split: splits) {
+ int prevLen = out.size();
+ Text.writeString(out, split.getClass().getName());
+ split.write(out);
+ int currLen = out.size();
+ info[i++] = new JobSplit.SplitMetaInfo(
+ split.getLocations(), offset,
+ split.getLength());
+ offset += currLen - prevLen;
+ }
+ }
+ return info;
+ }
+
+ private static void writeJobSplitMetaInfo(FileSystem fs, Path filename,
+ FsPermission p, int splitMetaInfoVersion,
+ JobSplit.SplitMetaInfo[] allSplitMetaInfo)
+ throws IOException {
+ // write the splits meta-info to a file for the job tracker
+ FSDataOutputStream out =
+ FileSystem.create(fs, filename, p);
+ out.write(JobSplit.META_SPLIT_FILE_HEADER);
+ WritableUtils.writeVInt(out, splitMetaInfoVersion);
+ WritableUtils.writeVInt(out, allSplitMetaInfo.length);
+ for (JobSplit.SplitMetaInfo splitMetaInfo : allSplitMetaInfo) {
+ splitMetaInfo.write(out);
+ }
+ out.close();
+ }
+}
+
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=1077108&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Fri Mar 4 03:41:31 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.split;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+
+/**
+ * A (internal) utility that reads the split meta info and creates
+ * split meta info objects
+ */
+
+public class SplitMetaInfoReader {
+
+ public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
+ JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir)
+ throws IOException {
+ long maxMetaInfoSize = conf.getLong("mapreduce.job.split.metainfo.maxsize",
+ 10000000L);
+ Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
+ FileStatus fStatus = fs.getFileStatus(metaSplitFile);
+ if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
+ throw new IOException("Split metadata size exceeded " +
+ maxMetaInfoSize +". Aborting job " + jobId);
+ }
+ FSDataInputStream in = fs.open(metaSplitFile);
+ byte[] header = new byte[JobSplit.META_SPLIT_FILE_HEADER.length];
+ in.readFully(header);
+ if (!Arrays.equals(JobSplit.META_SPLIT_FILE_HEADER, header)) {
+ throw new IOException("Invalid header on split file");
+ }
+ int vers = WritableUtils.readVInt(in);
+ if (vers != JobSplit.META_SPLIT_VERSION) {
+ in.close();
+ throw new IOException("Unsupported split version " + vers);
+ }
+ int numSplits = WritableUtils.readVInt(in); //TODO: check for insane values
+ JobSplit.TaskSplitMetaInfo[] allSplitMetaInfo =
+ new JobSplit.TaskSplitMetaInfo[numSplits];
+ for (int i = 0; i < numSplits; i++) {
+ JobSplit.SplitMetaInfo splitMetaInfo = new JobSplit.SplitMetaInfo();
+ splitMetaInfo.readFields(in);
+ JobSplit.TaskSplitIndex splitIndex = new JobSplit.TaskSplitIndex(
+ JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString(),
+ splitMetaInfo.getStartOffset());
+ allSplitMetaInfo[i] = new JobSplit.TaskSplitMetaInfo(splitIndex,
+ splitMetaInfo.getLocations(),
+ splitMetaInfo.getInputDataLength());
+ }
+ in.close();
+ return allSplitMetaInfo;
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/cli/testConf.xml Fri Mar 4 03:41:31 2011
@@ -91,11 +91,11 @@
<test> <!-- TESTED -->
<description>ls: directory using absolute path</description>
<test-commands>
- <command>-fs NAMENODE -mkdir /dir1</command>
- <command>-fs NAMENODE -ls /</command>
+ <command>-fs NAMENODE -mkdir /dir1/dir2</command>
+ <command>-fs NAMENODE -ls /dir1</command>
</test-commands>
<cleanup-commands>
- <command>-fs NAMENODE -rmr /dir1</command>
+ <command>-fs NAMENODE -rmr /dir1/dir2</command>
</cleanup-commands>
<comparators>
<comparator>
@@ -104,7 +104,7 @@
</comparator>
<comparator>
<type>RegexpComparator</type>
- <expected-output>^drwxr-xr-x( )*-( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*/dir1</expected-output>
+ <expected-output>^drwxr-xr-x( )*-( )*[a-z]*( )*supergroup( )*0( )*[0-9]{4,}-[0-9]{2,}-[0-9]{2,} [0-9]{2,}:[0-9]{2,}( )*/dir1/dir2</expected-output>
</comparator>
</comparators>
</test>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Fri Mar 4 03:41:31 2011
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -113,10 +114,10 @@ public class ClusterWithLinuxTaskControl
String[] splits = ugi.split(",");
taskControllerUser = new UnixUserGroupInformation(splits);
clusterConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
- createHomeDirectory(clusterConf);
+ createHomeAndStagingDirectory(clusterConf);
}
- private void createHomeDirectory(JobConf conf)
+ private void createHomeAndStagingDirectory(JobConf conf)
throws IOException {
FileSystem fs = dfsCluster.getFileSystem();
String path = "/user/" + taskControllerUser.getUserName();
@@ -124,6 +125,12 @@ public class ClusterWithLinuxTaskControl
LOG.info("Creating Home directory : " + homeDirectory);
fs.mkdirs(homeDirectory);
changePermission(conf, homeDirectory);
+ Path stagingArea =
+ new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
+ "/tmp/hadoop/mapred/staging"));
+ LOG.info("Creating Staging root directory : " + stagingArea);
+ fs.mkdirs(stagingArea);
+ fs.setPermission(stagingArea, new FsPermission((short)0777));
}
private void changePermission(JobConf conf, Path p)
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/GenericMRLoadGenerator.java Fri Mar 4 03:41:31 2011
@@ -135,14 +135,14 @@ public class GenericMRLoadGenerator exte
confRandom(job);
} else if (null != job.getClass("mapred.indirect.input.format", null)) {
// specified IndirectInputFormat? Build src list
- JobClient jClient = new JobClient(job);
- Path sysdir = jClient.getSystemDir();
+ JobClient jClient = new JobClient(job);
+ Path tmpDir = new Path(jClient.getFs().getHomeDirectory(), ".staging");
Random r = new Random();
- Path indirInputFile = new Path(sysdir,
+ Path indirInputFile = new Path(tmpDir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
job.set("mapred.indirect.input.file", indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
- sysdir.getFileSystem(job), job, indirInputFile,
+ tmpDir.getFileSystem(job), job, indirInputFile,
LongWritable.class, Text.class,
SequenceFile.CompressionType.NONE);
try {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Mar 4 03:41:31 2011
@@ -107,7 +107,7 @@ public class TestJobHistory extends Test
boolean isJobLaunched;
boolean isJTRestarted;
- TestListener(JobInfo job) {
+ TestListener(JobHistory.JobInfo job) {
super(job);
lineNum = 0;
isJobLaunched = false;
@@ -293,7 +293,7 @@ public class TestJobHistory extends Test
}
// Validate Format of Task Level Keys, Values read from history file
- private static void validateTaskLevelKeyValuesFormat(JobInfo job,
+ private static void validateTaskLevelKeyValuesFormat(JobHistory.JobInfo job,
boolean splitsCanBeEmpty) {
Map<String, JobHistory.Task> tasks = job.getAllTasks();
@@ -340,7 +340,7 @@ public class TestJobHistory extends Test
}
// Validate foramt of Task Attempt Level Keys, Values read from history file
- private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) {
+ private static void validateTaskAttemptLevelKeyValuesFormat(JobHistory.JobInfo job) {
Map<String, JobHistory.Task> tasks = job.getAllTasks();
// For each task
@@ -515,7 +515,7 @@ public class TestJobHistory extends Test
// Validate Job Level Keys, Values read from history file by
// comparing them with the actual values from JT.
private static void validateJobLevelKeyValues(MiniMRCluster mr,
- RunningJob job, JobInfo jobInfo, JobConf conf) throws IOException {
+ RunningJob job, JobHistory.JobInfo jobInfo, JobConf conf) throws IOException {
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
@@ -543,11 +543,11 @@ public class TestJobHistory extends Test
values.get(Keys.JOB_PRIORITY)));
assertTrue("Job Name of job obtained from history file did not " +
- "match the expected value", JobInfo.getJobName(conf).equals(
+ "match the expected value", JobHistory.JobInfo.getJobName(conf).equals(
values.get(Keys.JOBNAME)));
assertTrue("User Name of job obtained from history file did not " +
- "match the expected value", JobInfo.getUserName(conf).equals(
+ "match the expected value", JobHistory.JobInfo.getUserName(conf).equals(
values.get(Keys.USER)));
// Validate job counters
@@ -594,7 +594,7 @@ public class TestJobHistory extends Test
// Validate Task Level Keys, Values read from history file by
// comparing them with the actual values from JT.
private static void validateTaskLevelKeyValues(MiniMRCluster mr,
- RunningJob job, JobInfo jobInfo) throws IOException {
+ RunningJob job, JobHistory.JobInfo jobInfo) throws IOException {
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
@@ -676,7 +676,7 @@ public class TestJobHistory extends Test
// Validate Task Attempt Level Keys, Values read from history file by
// comparing them with the actual values from JT.
private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr,
- RunningJob job, JobInfo jobInfo) throws IOException {
+ RunningJob job, JobHistory.JobInfo jobInfo) throws IOException {
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistoryParsing.java Fri Mar 4 03:41:31 2011
@@ -37,9 +37,9 @@ public class TestJobHistoryParsing exte
* object with data from log file.
*/
static class TestListener implements Listener {
- JobInfo job;
+ JobHistory.JobInfo job;
- TestListener(JobInfo job) {
+ TestListener(JobHistory.JobInfo job) {
this.job = job;
}
// JobHistory.Listener implementation
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Mar 4 03:41:31 2011
@@ -28,6 +28,7 @@ import junit.framework.TestCase;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.mapreduce.split.JobSplit;
public class TestJobQueueTaskScheduler extends TestCase {
@@ -77,8 +78,8 @@ public class TestJobQueueTaskScheduler e
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(true);
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1,
- getJobConf().getUser()) {
+ Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(),
+ 1) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -93,8 +94,7 @@ public class TestJobQueueTaskScheduler e
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
TaskAttemptID attemptId = getTaskAttemptID(false);
- Task task = new ReduceTask("", attemptId, 0, 10, 1,
- getJobConf().getUser()) {
+ Task task = new ReduceTask("", attemptId, 0, 10, 1) {
@Override
public String toString() {
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobSysDirWithDFS.java Fri Mar 4 03:41:31 2011
@@ -55,7 +55,8 @@ public class TestJobSysDirWithDFS extend
Path outDir,
String input,
int numMaps,
- int numReduces) throws IOException {
+ int numReduces,
+ String sysDir) throws IOException {
FileSystem inFs = inDir.getFileSystem(conf);
FileSystem outFs = outDir.getFileSystem(conf);
outFs.delete(outDir, true);
@@ -88,14 +89,15 @@ public class TestJobSysDirWithDFS extend
// Checking that the Job Client system dir is not used
assertFalse(FileSystem.get(conf).exists(new Path(conf.get("mapred.system.dir"))));
// Check if the Job Tracker system dir is propogated to client
- String sysDir = jobClient.getSystemDir().toString();
+ sysDir = jobClient.getSystemDir().toString();
System.out.println("Job sys dir -->" + sysDir);
assertFalse(sysDir.contains("/tmp/subru/mapred/system"));
assertTrue(sysDir.contains("custom"));
return new TestResult(job, TestMiniMRWithDFS.readOutput(outDir, conf));
}
- static void runWordCount(MiniMRCluster mr, JobConf jobConf) throws IOException {
+ static void runWordCount(MiniMRCluster mr, JobConf jobConf, String sysDir)
+ throws IOException {
LOG.info("runWordCount");
// Run a word count example
// Keeping tasks that match this pattern
@@ -105,7 +107,7 @@ public class TestJobSysDirWithDFS extend
result = launchWordCount(jobConf, inDir, outDir,
"The quick brown fox\nhas many silly\n" +
"red fox sox\n",
- 3, 1);
+ 3, 1, sysDir);
assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
// Checking if the Job ran successfully in spite of different system dir config
@@ -126,7 +128,7 @@ public class TestJobSysDirWithDFS extend
fileSys = dfs.getFileSystem();
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1, null, null, conf);
- runWordCount(mr, mr.createJobConf());
+ runWordCount(mr, mr.createJobConf(), conf.get("mapred.system.dir"));
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestart.java Fri Mar 4 03:41:31 2011
@@ -27,12 +27,15 @@ import junit.framework.TestCase;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
-
+import org.junit.*;
/**
* TestJobTrackerRestart checks if the jobtracker can restart. JobTracker
* should be able to continue running the previously running jobs and also
* recover previosuly submitted jobs.
*/
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
public class TestJobTrackerRestart extends TestCase {
static final Path testDir =
new Path(System.getProperty("test.build.data","/tmp"),
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithLostTracker.java Fri Mar 4 03:41:31 2011
@@ -24,11 +24,15 @@ import org.apache.hadoop.mapred.TestJobT
import junit.framework.TestCase;
import java.io.*;
+import org.junit.*;
/**
* This test checks if the jobtracker can detect and recover a tracker that was
* lost while the jobtracker was down.
*/
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
public class TestJobTrackerRestartWithLostTracker extends TestCase {
final Path testDir = new Path("/jt-restart-lost-tt-testing");
final Path inDir = new Path(testDir, "input");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java Fri Mar 4 03:41:31 2011
@@ -27,12 +27,16 @@ import junit.framework.TestCase;
import java.io.*;
import java.util.HashSet;
import java.util.Set;
+import org.junit.*;
/**
* This test checks jobtracker in safe mode. In safe mode the jobtracker upon
* restart doesnt schedule any new tasks and waits for the (old) trackers to
* join back.
*/
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
public class TestJobTrackerSafeMode extends TestCase {
final Path testDir =
new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRClasspath.java Fri Mar 4 03:41:31 2011
@@ -19,6 +19,8 @@
package org.apache.hadoop.mapred;
import java.io.*;
+import java.net.URI;
+
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -35,15 +37,13 @@ import org.apache.hadoop.io.Text;
public class TestMiniMRClasspath extends TestCase {
- static String launchWordCount(String fileSys,
+ static void configureWordCount(FileSystem fs,
String jobTracker,
JobConf conf,
String input,
int numMaps,
- int numReduces) throws IOException {
- final Path inDir = new Path("/testing/wc/input");
- final Path outDir = new Path("/testing/wc/output");
- FileSystem fs = FileSystem.getNamed(fileSys, conf);
+ int numReduces,
+ Path inDir, Path outDir) throws IOException {
fs.delete(outDir, true);
if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString());
@@ -53,7 +53,7 @@ public class TestMiniMRClasspath extends
file.writeBytes(input);
file.close();
}
- FileSystem.setDefaultUri(conf, fileSys);
+ FileSystem.setDefaultUri(conf, fs.getUri());
conf.set("mapred.job.tracker", jobTracker);
conf.setJobName("wordcount");
conf.setInputFormat(TextInputFormat.class);
@@ -72,6 +72,16 @@ public class TestMiniMRClasspath extends
conf.setNumReduceTasks(numReduces);
//pass a job.jar already included in the hadoop build
conf.setJar("build/test/testjar/testjob.jar");
+ }
+
+ static String launchWordCount(String fileSys, String jobTracker, JobConf conf,
+ String input, int numMaps, int numReduces)
+ throws IOException {
+ final Path inDir = new Path("/testing/wc/input");
+ final Path outDir = new Path("/testing/wc/output");
+ FileSystem fs = FileSystem.getNamed(fileSys, conf);
+ configureWordCount(fs, jobTracker, conf, input, numMaps, numReduces, inDir,
+ outDir);
JobClient.runJob(conf);
StringBuffer result = new StringBuffer();
{
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar 4 03:41:31 2011
@@ -210,8 +210,10 @@ public class TestMiniMRWithDFS extends T
long hdfsWrite =
counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+ long rawSplitBytesRead =
+ counters.findCounter(Task.Counter.SPLIT_RAW_BYTES).getCounter();
assertEquals(result.output.length(), hdfsWrite);
- assertEquals(input.length(), hdfsRead);
+ assertEquals(input.length() + rawSplitBytesRead, hdfsRead);
// Run a job with input and output going to localfs even though the
// default fs is hdfs.
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Fri Mar 4 03:41:31 2011
@@ -23,6 +23,8 @@ import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -43,8 +45,10 @@ public class TestMiniMRWithDFSWithDistin
}
static JobConf createJobConf(MiniMRCluster mr, UnixUserGroupInformation ugi) {
- JobConf jobconf = mr.createJobConf();
- UnixUserGroupInformation.saveToConf(jobconf,
+ return createJobConf(mr.createJobConf(), ugi);
+ }
+ static JobConf createJobConf(JobConf conf, UnixUserGroupInformation ugi) {
+ JobConf jobconf = new JobConf(conf); UnixUserGroupInformation.saveToConf(jobconf,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
return jobconf;
}
@@ -55,6 +59,50 @@ public class TestMiniMRWithDFSWithDistin
fs.setPermission(p, new FsPermission((short)0777));
}
+ // runs a sample job as a user (ugi)
+ RunningJob runJobAsUser(JobConf job, UserGroupInformation ugi)
+ throws Exception {
+ JobSubmissionProtocol jobSubmitClient =
+ TestSubmitJob.getJobSubmitClient(job, ugi);
+ JobID id = jobSubmitClient.getNewJobId();
+
+ InputSplit[] splits = computeJobSplit(JobID.downgrade(id), job);
+ Path jobSubmitDir = new Path(id.toString());
+ FileSystem fs = jobSubmitDir.getFileSystem(job);
+ jobSubmitDir = jobSubmitDir.makeQualified(fs);
+ uploadJobFiles(JobID.downgrade(id), splits, jobSubmitDir, job);
+
+ jobSubmitClient.submitJob(id, jobSubmitDir.toString());
+
+ JobClient jc = new JobClient(job);
+ return jc.getJob(JobID.downgrade(id));
+ }
+
+ // a helper api for split computation
+ private InputSplit[] computeJobSplit(JobID id, JobConf conf)
+ throws IOException {
+ InputSplit[] splits =
+ conf.getInputFormat().getSplits(conf, conf.getNumMapTasks());
+ conf.setNumMapTasks(splits.length);
+ return splits;
+ }
+
+
+ // a helper api for split submission
+ private void uploadJobFiles(JobID id, InputSplit[] splits,
+ Path jobSubmitDir, JobConf conf)
+ throws IOException {
+ Path confLocation = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
+ JobSplitWriter.createSplitFiles(jobSubmitDir, conf, splits);
+ FileSystem fs = confLocation.getFileSystem(conf);
+ FsPermission perm = new FsPermission((short)0700);
+
+ // localize conf
+ DataOutputStream confOut = FileSystem.create(fs, confLocation, perm);
+ conf.writeXml(confOut);
+ confOut.close();
+ }
+
public void testDistinctUsers() throws Exception {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
@@ -72,9 +120,21 @@ public class TestMiniMRWithDFSWithDistin
mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
1, null, null, MR_UGI);
- JobConf pi = createJobConf(mr, PI_UGI);
- TestMiniMRWithDFS.runPI(mr, pi);
-
+ String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ JobConf job1 = mr.createJobConf();
+ String input = "The quick brown fox\nhas many silly\n"
+ + "red fox sox\n";
+ Path inDir = new Path("/testing/distinct/input");
+ Path outDir = new Path("/testing/distinct/output");
+ TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1,
+ input, 2, 1, inDir, outDir);
+ JobConf job2 = mr.createJobConf();
+ Path inDir2 = new Path("/testing/distinct/input2");
+ Path outDir2 = new Path("/testing/distinct/output2");
+ TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job2,
+ input, 2, 1, inDir2, outDir2);
+ job2 = createJobConf(job2, WC_UGI);
+ runJobAsUser(job2, WC_UGI);
JobConf wc = createJobConf(mr, WC_UGI);
TestMiniMRWithDFS.runWordCount(mr, wc);
} finally {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeRefresh.java Fri Mar 4 03:41:31 2011
@@ -387,82 +387,4 @@ public class TestNodeRefresh extends Tes
stopCluster();
}
-
- /**
- * Check if excluded hosts are decommissioned across restart
- */
- public void testMRExcludeHostsAcrossRestarts() throws IOException {
- // start a cluster with 2 hosts and empty exclude-hosts file
- Configuration conf = new Configuration();
- conf.setBoolean("mapred.jobtracker.restart.recover", true);
-
- File file = new File("hosts.exclude");
- file.delete();
- startCluster(2, 1, 0, conf);
- String hostToDecommission = getHostname(1);
- conf = mr.createJobConf(new JobConf(conf));
-
- // submit a job
- Path inDir = new Path("input");
- Path outDir = new Path("output");
- Path signalFilename = new Path("share");
- JobConf newConf = new JobConf(conf);
- UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1,
- "restart-decommission", signalFilename.toString(),
- signalFilename.toString());
-
- JobClient jobClient = new JobClient(newConf);
- RunningJob job = jobClient.submitJob(newConf);
- JobID id = job.getID();
-
- // wait for 50%
- while (job.mapProgress() < 0.5f) {
- UtilsForTests.waitFor(100);
- }
-
- // change the exclude-hosts file to include one host
- FileOutputStream out = new FileOutputStream(file);
- LOG.info("Writing excluded nodes to log file " + file.toString());
- BufferedWriter writer = null;
- try {
- writer = new BufferedWriter(new OutputStreamWriter(out));
- writer.write( hostToDecommission + "\n"); // decommission first host
- } finally {
- if (writer != null) {
- writer.close();
- }
- out.close();
- }
- file.deleteOnExit();
-
- // restart the jobtracker
- mr.stopJobTracker();
- mr.startJobTracker();
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
-
- jt = mr.getJobTrackerRunner().getJobTracker();
- UtilsForTests.signalTasks(dfs, dfs.getFileSystem(),
- signalFilename.toString(), signalFilename.toString(), 1);
-
- assertTrue("Decommissioning of tracker has no effect restarted job",
- jt.getJob(job.getID()).failedMapTasks > 0);
-
- // check the cluster status and tracker size
- assertEquals("Tracker is not lost upon host decommissioning",
- 1, jt.getClusterStatus(false).getTaskTrackers());
- assertEquals("Excluded node count is incorrect",
- 1, jt.getClusterStatus(false).getNumExcludedNodes());
-
- // check if the host is disallowed
- for (TaskTrackerStatus status : jt.taskTrackers()) {
- assertFalse("Tracker from decommissioned host still exist",
- status.getHost().equals(hostToDecommission));
- }
-
- // wait for the job
- job.waitForCompletion();
-
- stopCluster();
- }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java Fri Mar 4 03:41:31 2011
@@ -480,7 +480,10 @@ public class TestQueueManager extends Te
//try to kill as self
try {
- rjob.killJob();
+ conf.set("mapred.job.tracker", "localhost:"
+ + miniMRCluster.getJobTrackerPort());
+ JobClient jc = new JobClient(conf);
+ jc.getJob(rjob.getJobID()).killJob();
if (!shouldSucceed) {
fail("should fail kill operation");
}
@@ -519,7 +522,10 @@ public class TestQueueManager extends Te
// try to change priority as self
try {
- rjob.setJobPriority("VERY_LOW");
+ conf.set("mapred.job.tracker", "localhost:"
+ + miniMRCluster.getJobTrackerPort());
+ JobClient jc = new JobClient(conf);
+ jc.getJob(rjob.getJobID()).setJobPriority("VERY_LOW");
if (!shouldSucceed) {
fail("changing priority should fail.");
}
@@ -546,6 +552,9 @@ public class TestQueueManager extends Te
private void setUpCluster(JobConf conf) throws IOException {
miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
FileSystem fileSys = miniDFSCluster.getFileSystem();
+ TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys,
+ conf.get("mapreduce.jobtracker.staging.root.dir",
+ "/tmp/hadoop/mapred/staging"));
String namenode = fileSys.getUri().toString();
miniMRCluster = new MiniMRCluster(1, namenode, 3,
null, null, conf);
@@ -596,7 +605,7 @@ public class TestQueueManager extends Te
if (shouldComplete) {
rJob = JobClient.runJob(jc);
} else {
- rJob = new JobClient(clientConf).submitJob(jc);
+ rJob = new JobClient(jc).submitJob(jc);
}
return rJob;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestRecoveryManager.java Fri Mar 4 03:41:31 2011
@@ -29,16 +29,19 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.*;
/**
* Test whether the {@link RecoveryManager} is able to tolerate job-recovery
* failures and the jobtracker is able to tolerate {@link RecoveryManager}
* failure.
*/
+/**UNTIL MAPREDUCE-873 is backported, we will not run recovery manager tests
+ */
+@Ignore
public class TestRecoveryManager extends TestCase {
private static final Log LOG =
LogFactory.getLog(TestRecoveryManager.class);
@@ -51,8 +54,7 @@ public class TestRecoveryManager extends
* {@link JobTracker.RecoveryManager}. It does the following :
* - submits 2 jobs
* - kills the jobtracker
- * - Garble job.xml for one job causing it to fail in constructor
- * and job.split for another causing it to fail in init.
+ * - deletes the info file for one job
* - restarts the jobtracker
* - checks if the jobtraker starts normally
*/
@@ -106,26 +108,15 @@ public class TestRecoveryManager extends
// delete the job.xml of job #1 causing the job to fail in constructor
Path jobFile =
- new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
- LOG.info("Deleting job.xml file : " + jobFile.toString());
+ new Path(sysDir, rJob1.getID().toString() + "/" + JobTracker.JOB_INFO_FILE);
+ LOG.info("Deleting job token file : " + jobFile.toString());
fs.delete(jobFile, false); // delete the job.xml file
- // create the job.xml file with 0 bytes
+ // create the job.xml file with 1 bytes
FSDataOutputStream out = fs.create(jobFile);
out.write(1);
out.close();
- // delete the job.split of job #2 causing the job to fail in initTasks
- Path jobSplitFile =
- new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
- LOG.info("Deleting job.split file : " + jobSplitFile.toString());
- fs.delete(jobSplitFile, false); // delete the job.split file
-
- // create the job.split file with 0 bytes
- out = fs.create(jobSplitFile);
- out.write(1);
- out.close();
-
// make sure that the jobtracker is in recovery mode
mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
true);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java Fri Mar 4 03:41:31 2011
@@ -18,7 +18,7 @@
package org.apache.hadoop.mapred;
import junit.framework.TestCase;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapreduce.split.JobSplit;
public class TestResourceEstimation extends TestCase {
@@ -45,8 +45,8 @@ public class TestResourceEstimation exte
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- RawSplit split = new RawSplit();
- split.setDataLength(0);
+ JobSplit.TaskSplitMetaInfo split =
+ new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
TaskInProgress tip =
new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
@@ -82,8 +82,9 @@ public class TestResourceEstimation exte
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- RawSplit split = new RawSplit();
- split.setDataLength(singleMapInputSize);
+ JobSplit.TaskSplitMetaInfo split =
+ new JobSplit.TaskSplitMetaInfo(new String[0], 0,
+ singleMapInputSize);
TaskInProgress tip =
new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
@@ -95,8 +96,8 @@ public class TestResourceEstimation exte
//add one more map task with input size as 0
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- RawSplit split = new RawSplit();
- split.setDataLength(0);
+ JobSplit.TaskSplitMetaInfo split =
+ new JobSplit.TaskSplitMetaInfo(new String[0], 0, 0);
TaskInProgress tip =
new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSubmitJob.java Fri Mar 4 03:41:31 2011
@@ -18,24 +18,58 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.net.URI;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ToolRunner;
import junit.framework.TestCase;
public class TestSubmitJob extends TestCase {
- private MiniMRCluster miniMRCluster;
+ static final Log LOG = LogFactory.getLog(TestSubmitJob.class);
+ private MiniMRCluster mrCluster;
- @Override
- protected void tearDown()
- throws Exception {
- if (miniMRCluster != null) {
- miniMRCluster.shutdown();
- }
+ private MiniDFSCluster dfsCluster;
+ private JobTracker jt;
+ private FileSystem fs;
+ private static Path TEST_DIR =
+ new Path(System.getProperty("test.build.data","/tmp"),
+ "job-submission-testing");
+ private static int numSlaves = 1;
+
+ private void startCluster() throws Exception {
+ super.setUp();
+ Configuration conf = new Configuration();
+ dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
+ JobConf jConf = new JobConf(conf);
+ jConf.setLong("mapred.job.submission.expiry.interval", 6 * 1000);
+ mrCluster = new MiniMRCluster(0, 0, numSlaves,
+ dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null,
+ jConf);
+ jt = mrCluster.getJobTrackerRunner().getJobTracker();
+ fs = FileSystem.get(mrCluster.createJobConf());
}
+ private void stopCluster() throws Exception {
+ mrCluster.shutdown();
+ mrCluster = null;
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ jt = null;
+ fs = null;
+ }
/**
* Test to verify that jobs with invalid memory requirements are killed at the
* JT.
@@ -54,9 +88,9 @@ public class TestSubmitJob extends TestC
jtConf.setLong(JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
4 * 1024L);
- miniMRCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
+ mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
- JobConf clusterConf = miniMRCluster.createJobConf();
+ JobConf clusterConf = mrCluster.createJobConf();
// No map-memory configuration
JobConf jobConf = new JobConf(clusterConf);
@@ -83,6 +117,9 @@ public class TestSubmitJob extends TestC
jobConf.setMemoryForReduceTask(5 * 1024L);
runJobAndVerifyFailure(jobConf, 1 * 1024L, 5 * 1024L,
"Exceeds the cluster's max-memory-limit.");
+
+ mrCluster.shutdown();
+ mrCluster = null;
}
private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks,
@@ -108,4 +145,126 @@ public class TestSubmitJob extends TestC
+ " - doesn't contain expected message - " + overallExpectedMsg, msg
.contains(overallExpectedMsg));
}
-}
+ static JobSubmissionProtocol getJobSubmitClient(JobConf conf,
+ UserGroupInformation ugi)
+ throws IOException {
+ return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, JobTracker.getAddress(conf), ugi,
+ conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+ }
+
+ static org.apache.hadoop.hdfs.protocol.ClientProtocol getDFSClient(
+ Configuration conf, UserGroupInformation ugi)
+ throws IOException {
+ return (org.apache.hadoop.hdfs.protocol.ClientProtocol)
+ RPC.getProxy(org.apache.hadoop.hdfs.protocol.ClientProtocol.class,
+ org.apache.hadoop.hdfs.protocol.ClientProtocol.versionID,
+ NameNode.getAddress(conf), ugi,
+ conf,
+ NetUtils.getSocketFactory(conf,
+ org.apache.hadoop.hdfs.protocol.ClientProtocol.class));
+ }
+
+ /**
+ * Submit a job and check if the files are accessible to other users.
+ */
+ public void testSecureJobExecution() throws Exception {
+ LOG.info("Testing secure job submission/execution");
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ try {
+ Configuration conf = new Configuration();
+ UnixUserGroupInformation.saveToConf(conf,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME,
+ TestMiniMRWithDFSWithDistinctUsers.DFS_UGI);
+ dfs = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs = dfs.getFileSystem();
+ TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
+ TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
+ TestMiniMRWithDFSWithDistinctUsers.mkdir(fs,
+ conf.get("mapreduce.jobtracker.staging.root.dir",
+ "/tmp/hadoop/mapred/staging"));
+ UnixUserGroupInformation MR_UGI =
+ TestMiniMRWithDFSWithDistinctUsers.createUGI(
+ UnixUserGroupInformation.login().getUserName(), false);
+ mr = new MiniMRCluster(0, 0, 1, dfs.getFileSystem().getUri().toString(),
+ 1, null, null, MR_UGI);
+ JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+ String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ // cleanup
+ dfs.getFileSystem().delete(TEST_DIR, true);
+
+ final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
+ final Path reduceSignalFile = new Path(TEST_DIR, "reduce-signal");
+
+ // create a ugi for user 1
+ UnixUserGroupInformation user1 =
+ TestMiniMRWithDFSWithDistinctUsers.createUGI("user1", false);
+ Path inDir = new Path("/user/input");
+ Path outDir = new Path("/user/output");
+ JobConf job =
+ TestMiniMRWithDFSWithDistinctUsers.createJobConf(mr, user1);
+
+ UtilsForTests.configureWaitingJobConf(job, inDir, outDir, 2, 0,
+ "test-submit-job", mapSignalFile.toString(),
+ reduceSignalFile.toString());
+ job.set(UtilsForTests.getTaskSignalParameter(true),
+ mapSignalFile.toString());
+ job.set(UtilsForTests.getTaskSignalParameter(false),
+ reduceSignalFile.toString());
+ LOG.info("Submit job as the actual user (" + user1.getUserName() + ")");
+ JobClient jClient = new JobClient(job);
+ RunningJob rJob = jClient.submitJob(job);
+ JobID id = rJob.getID();
+ LOG.info("Running job " + id);
+
+ // create user2
+ UnixUserGroupInformation user2 =
+ TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", false);
+ JobConf conf_other =
+ TestMiniMRWithDFSWithDistinctUsers.createJobConf(mr, user2);
+ org.apache.hadoop.hdfs.protocol.ClientProtocol client =
+ getDFSClient(conf_other, user2);
+
+ // try accessing mapred.system.dir/jobid/*
+ boolean failed = false;
+ try {
+ Path path = new Path(new URI(jt.getSystemDir()).getPath());
+ LOG.info("Try listing the mapred-system-dir as the user ("
+ + user2.getUserName() + ")");
+ client.getListing(path.toString());
+ } catch (IOException ioe) {
+ failed = true;
+ }
+ assertTrue("JobTracker system dir is accessible to others", failed);
+ // try accessing ~/.staging/jobid/*
+ failed = false;
+ JobInProgress jip = jt.getJob(id);
+ Path jobSubmitDirpath =
+ new Path(jip.getJobConf().get("mapreduce.job.dir"));
+ try {
+ LOG.info("Try accessing the job folder for job " + id + " as the user ("
+ + user2.getUserName() + ")");
+ client.getListing(jobSubmitDirpath.toString());
+ } catch (IOException ioe) {
+ failed = true;
+ }
+ assertTrue("User's staging folder is accessible to others", failed);
+ UtilsForTests.signalTasks(dfs, fs, true, mapSignalFile.toString(),
+ reduceSignalFile.toString());
+ // wait for job to be done
+ UtilsForTests.waitTillDone(jClient);
+
+ // check if the staging area is cleaned up
+ LOG.info("Check if job submit dir is cleanup or not");
+ assertFalse(fs.exists(jobSubmitDirpath));
+ } finally {
+ if (mr != null) {
+ mr.shutdown();
+ }
+ if (dfs != null) {
+ dfs.shutdown();
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java Fri Mar 4 03:41:31 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
import org.apache.hadoop.mapred.TaskLog.LogName;
import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapreduce.split.JobSplit;
import org.junit.After;
import org.junit.Test;
@@ -140,7 +141,8 @@ public class TestTaskLogsMonitor {
int taskcount = 0;
TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
- Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+ Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+ 0);
// Let the tasks write logs within retain-size
writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 500, 'H');
@@ -181,7 +183,8 @@ public class TestTaskLogsMonitor {
int taskcount = 0;
TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
- Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+ Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+ 0);
// Let the tasks write some logs
writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
@@ -218,7 +221,8 @@ public class TestTaskLogsMonitor {
int taskcount = 0;
TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
- Task task = new MapTask(null, attemptID, 0, null, null, 0, null);
+ Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(),
+ 0);
// Let the tasks write logs more than retain-size
writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
@@ -259,7 +263,8 @@ public class TestTaskLogsMonitor {
// Assuming the job's retain size is 150
TaskAttemptID attempt1 = new TaskAttemptID(baseTaskID, attemptsCount++);
- Task task1 = new MapTask(null, attempt1, 0, null, null, 0, null);
+ Task task1 = new MapTask(null, attempt1, 0, new JobSplit.TaskSplitIndex(),
+ 0);
// Let the tasks write logs more than retain-size
writeRealBytes(attempt1, attempt1, LogName.SYSLOG, 200, 'A');
@@ -271,7 +276,8 @@ public class TestTaskLogsMonitor {
// Start another attempt in the same JVM
TaskAttemptID attempt2 = new TaskAttemptID(baseTaskID, attemptsCount++);
- Task task2 = new MapTask(null, attempt2, 0, null, null, 0, null);
+ Task task2 = new MapTask(null, attempt2, 0, new JobSplit.TaskSplitIndex(),
+ 0);
logsMonitor.monitorTaskLogs();
// Let attempt2 also write some logs
@@ -280,7 +286,8 @@ public class TestTaskLogsMonitor {
// Start yet another attempt in the same JVM
TaskAttemptID attempt3 = new TaskAttemptID(baseTaskID, attemptsCount++);
- Task task3 = new MapTask(null, attempt3, 0, null, null, 0, null);
+ Task task3 = new MapTask(null, attempt3, 0, new JobSplit.TaskSplitIndex(),
+ 0);
logsMonitor.monitorTaskLogs();
// Let attempt3 also write some logs
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/UtilsForTests.java Fri Mar 4 03:41:31 2011
@@ -254,7 +254,9 @@ public class UtilsForTests {
while (true) {
boolean shouldWait = false;
for (JobStatus jobStatuses : jobClient.getAllJobs()) {
- if (jobStatuses.getRunState() == JobStatus.RUNNING) {
+ if (jobStatuses.getRunState() != JobStatus.SUCCEEDED
+ && jobStatuses.getRunState() != JobStatus.FAILED
+ && jobStatuses.getRunState() != JobStatus.KILLED) {
shouldWait = true;
break;
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java?rev=1077108&r1=1077107&r2=1077108&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/tools/org/apache/hadoop/tools/DistCh.java Fri Mar 4 03:41:31 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapred.OutputCo
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@@ -423,7 +424,13 @@ public class DistCh extends DistTool {
private boolean setup(List<FileOperation> ops, Path log) throws IOException {
final String randomId = getRandomId();
JobClient jClient = new JobClient(jobconf);
- Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
+ Path stagingArea;
+ stagingArea = JobSubmissionFiles.getStagingDir(
+ jClient, jobconf);
+ Path jobdir = new Path(stagingArea + NAME + "_" + randomId);
+ FsPermission mapredSysPerms =
+ new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
+ FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms);
LOG.info(JOB_DIR_LABEL + "=" + jobdir);
if (log == null) {