You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2012/05/29 16:45:26 UTC
svn commit: r1343756 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/...
Author: tomwhite
Date: Tue May 29 14:45:25 2012
New Revision: 1343756
URL: http://svn.apache.org/viewvc?rev=1343756&view=rev
Log:
Merge -r 1343754:1343755 from trunk to branch-2. Fixes: MAPREDUCE-4146
Added:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java
- copied unchanged from r1343755, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1343756&r1=1343755&r2=1343756&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue May 29 14:45:25 2012
@@ -8,6 +8,9 @@ Release 2.0.1-alpha - UNRELEASED
IMPROVEMENTS
+ MAPREDUCE-4146. Support limits on task status string length and number of
+ block locations in branch-2. (Ahmed Radwan via tomwhite)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1343756&r1=1343755&r2=1343756&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue May 29 14:45:25 2012
@@ -53,7 +53,6 @@ import org.apache.hadoop.io.WritableUtil
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.mapred.IFile.Writer;
-import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskCounter;
@@ -569,7 +568,21 @@ abstract public class Task implements Wr
resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
}
}
-
+
+ public static String normalizeStatus(String status, Configuration conf) {
+ // Check to see if the status string is too long
+ // and truncate it if needed.
+ int progressStatusLength = conf.getInt(
+ MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
+ MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
+ if (status.length() > progressStatusLength) {
+ LOG.warn("Task status: \"" + status + "\" truncated to max limit ("
+ + progressStatusLength + " characters)");
+ status = status.substring(0, progressStatusLength);
+ }
+ return status;
+ }
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
protected class TaskReporter
@@ -603,7 +616,7 @@ abstract public class Task implements Wr
return progressFlag.getAndSet(false);
}
public void setStatus(String status) {
- taskProgress.setStatus(status);
+ taskProgress.setStatus(normalizeStatus(status, conf));
// indicate that progress update needs to be sent
setProgressFlag();
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1343756&r1=1343755&r2=1343756&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Tue May 29 14:45:25 2012
@@ -71,4 +71,12 @@ public interface MRConfig {
public static final String TASK_LOCAL_OUTPUT_CLASS =
"mapreduce.task.local.output.class";
+
+ public static final String PROGRESS_STATUS_LEN_LIMIT_KEY =
+ "mapreduce.task.max.status.length";
+ public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512;
+
+ public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
+ public static final String MAX_BLOCK_LOCATIONS_KEY =
+ "mapreduce.job.max.split.locations";
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java?rev=1343756&r1=1343755&r2=1343756&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java Tue May 29 14:45:25 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.io.serializer.S
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -48,6 +49,7 @@ public class JobSplitWriter {
private static final int splitVersion = JobSplit.META_SPLIT_VERSION;
private static final byte[] SPLIT_FILE_HEADER;
+
static {
try {
SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8");
@@ -82,7 +84,7 @@ public class JobSplitWriter {
throws IOException {
FSDataOutputStream out = createFile(fs,
JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);
- SplitMetaInfo[] info = writeOldSplits(splits, out);
+ SplitMetaInfo[] info = writeOldSplits(splits, out, conf);
out.close();
writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir),
new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion,
@@ -114,6 +116,8 @@ public class JobSplitWriter {
if (array.length != 0) {
SerializationFactory factory = new SerializationFactory(conf);
int i = 0;
+ int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
+ MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
long offset = out.getPos();
for(T split: array) {
long prevCount = out.getPos();
@@ -123,9 +127,15 @@ public class JobSplitWriter {
serializer.open(out);
serializer.serialize(split);
long currCount = out.getPos();
+ String[] locations = split.getLocations();
+ if (locations.length > maxBlockLocations) {
+ throw new IOException("Max block location exceeded for split: "
+ + split + " splitsize: " + locations.length +
+ " maxsize: " + maxBlockLocations);
+ }
info[i++] =
new JobSplit.SplitMetaInfo(
- split.getLocations(), offset,
+ locations, offset,
split.getLength());
offset += currCount - prevCount;
}
@@ -135,18 +145,26 @@ public class JobSplitWriter {
private static SplitMetaInfo[] writeOldSplits(
org.apache.hadoop.mapred.InputSplit[] splits,
- FSDataOutputStream out) throws IOException {
+ FSDataOutputStream out, Configuration conf) throws IOException {
SplitMetaInfo[] info = new SplitMetaInfo[splits.length];
if (splits.length != 0) {
int i = 0;
long offset = out.getPos();
+ int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY,
+ MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT);
for(org.apache.hadoop.mapred.InputSplit split: splits) {
long prevLen = out.getPos();
Text.writeString(out, split.getClass().getName());
split.write(out);
long currLen = out.getPos();
+ String[] locations = split.getLocations();
+ if (locations.length > maxBlockLocations) {
+ throw new IOException("Max block location exceeded for split: "
+ + split + " splitsize: " + locations.length +
+ " maxsize: " + maxBlockLocations);
+ }
info[i++] = new JobSplit.SplitMetaInfo(
- split.getLocations(), offset,
+ locations, offset,
split.getLength());
offset += currLen - prevLen;
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java?rev=1343756&r1=1343755&r2=1343756&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java Tue May 29 14:45:25 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.task
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.StatusReporter;
@@ -92,8 +93,9 @@ public class TaskAttemptContextImpl exte
*/
@Override
public void setStatus(String status) {
- setStatusString(status);
- reporter.setStatus(status);
+ String normalizedStatus = Task.normalizeStatus(status, conf);
+ setStatusString(normalizedStatus);
+ reporter.setStatus(normalizedStatus);
}
public static class DummyReporter extends StatusReporter {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java?rev=1343756&r1=1343755&r2=1343756&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java Tue May 29 14:45:25 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.mapred;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
@@ -25,10 +26,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+
import static org.junit.Assert.*;
/**
@@ -98,7 +104,28 @@ public class TestReporter {
progressRange, reporter.getProgress(), 0f);
}
}
-
+
+ static class StatusLimitMapper extends
+ org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> {
+
+ @Override
+ public void map(LongWritable key, Text value, Context context)
+ throws IOException {
+ StringBuilder sb = new StringBuilder(512);
+ for (int i = 0; i < 1000; i++) {
+ sb.append("a");
+ }
+ context.setStatus(sb.toString());
+ int progressStatusLength = context.getConfiguration().getInt(
+ MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
+ MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
+
+ if (context.getStatus().length() > progressStatusLength) {
+ throw new IOException("Status is not truncated");
+ }
+ }
+ }
+
/**
* Test {@link Reporter}'s progress for a map-only job.
* This will make sure that only the map phase decides the attempt's progress.
@@ -166,7 +193,6 @@ public class TestReporter {
/**
* Test {@link Reporter}'s progress for map-reduce job.
*/
- @SuppressWarnings("deprecation")
@Test
public void testReporterProgressForMRJob() throws IOException {
Path test = new Path(testRootTempDir, "testReporterProgressForMRJob");
@@ -186,4 +212,39 @@ public class TestReporter {
assertTrue("Job failed", job.isSuccessful());
}
+
+ @Test
+ public void testStatusLimit() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ Path test = new Path(testRootTempDir, "testStatusLimit");
+
+ Configuration conf = new Configuration();
+ Path inDir = new Path(test, "in");
+ Path outDir = new Path(test, "out");
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(inDir)) {
+ fs.delete(inDir, true);
+ }
+ fs.mkdirs(inDir);
+ DataOutputStream file = fs.create(new Path(inDir, "part-" + 0));
+ file.writeBytes("testStatusLimit");
+ file.close();
+
+ if (fs.exists(outDir)) {
+ fs.delete(outDir, true);
+ }
+
+ Job job = Job.getInstance(conf, "testStatusLimit");
+
+ job.setMapperClass(StatusLimitMapper.class);
+ job.setNumReduceTasks(0);
+
+ FileInputFormat.addInputPath(job, inDir);
+ FileOutputFormat.setOutputPath(job, outDir);
+
+ job.waitForCompletion(true);
+
+ assertTrue("Job failed", job.isSuccessful());
+ }
+
}
\ No newline at end of file