You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/07/16 13:47:02 UTC
svn commit: r794637 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/MRConstants.java
src/java/org/apache/hadoop/mapred/ReduceTask.java
src/java/org/apache/hadoop/mapred/TaskTracker.java
Author: ddas
Date: Thu Jul 16 11:47:01 2009
New Revision: 794637
URL: http://svn.apache.org/viewvc?rev=794637&view=rev
Log:
MAPREDUCE-18. Puts some checks for cross checking whether a reduce task gets the correct shuffle data. Re-committed the same patch after talking to Ravi.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=794637&r1=794636&r2=794637&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 16 11:47:01 2009
@@ -214,3 +214,7 @@
MAPREDUCE-680. Fix so MRUnit can handle reuse of Writable objects.
(Aaron Kimball via johan)
+
+ MAPREDUCE-18. Puts some checks for cross checking whether a reduce
+ task gets the correct shuffle data. (Ravi Gummadi via ddas)
+
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java?rev=794637&r1=794636&r2=794637&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MRConstants.java Thu Jul 16 11:47:01 2009
@@ -45,5 +45,15 @@
*/
public static final String RAW_MAP_OUTPUT_LENGTH = "Raw-Map-Output-Length";
+ /**
+ * The map task from which the map output data is being transferred
+ */
+ public static final String FROM_MAP_TASK = "from-map-task";
+
+ /**
+ * The reduce task number for which this map output is being transferred
+ */
+ public static final String FOR_REDUCE_TASK = "for-reduce-task";
+
public static final String WORKDIR = "work";
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=794637&r1=794636&r2=794637&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jul 16 11:47:01 2009
@@ -1294,7 +1294,8 @@
Path tmpMapOutput = new Path(filename+"-"+id);
// Copy the map output
- MapOutput mapOutput = getMapOutput(loc, tmpMapOutput);
+ MapOutput mapOutput = getMapOutput(loc, tmpMapOutput,
+ reduceId.getTaskID().getId());
if (mapOutput == null) {
throw new IOException("Failed to fetch map-output for " +
loc.getTaskAttemptId() + " from " +
@@ -1375,7 +1376,7 @@
* @throws IOException when something goes wrong
*/
private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
- Path filename)
+ Path filename, int reduce)
throws IOException, InterruptedException {
// Connect
URLConnection connection =
@@ -1383,16 +1384,52 @@
InputStream input = getInputStream(connection, shuffleConnectionTimeout,
shuffleReadTimeout);
+ // Validate header from map output
+ TaskAttemptID mapId = null;
+ try {
+ mapId =
+ TaskAttemptID.forName(connection.getHeaderField(FROM_MAP_TASK));
+ } catch (IllegalArgumentException ia) {
+ LOG.warn("Invalid map id ", ia);
+ return null;
+ }
+ TaskAttemptID expectedMapId = mapOutputLoc.getTaskAttemptId();
+ if (!mapId.equals(expectedMapId)) {
+ LOG.warn("data from wrong map:" + mapId +
+ " arrived to reduce task " + reduce +
+ ", where as expected map output should be from " + expectedMapId);
+ return null;
+ }
+
+ long decompressedLength =
+ Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
+ long compressedLength =
+ Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
+
+ if (compressedLength < 0 || decompressedLength < 0) {
+ LOG.warn(getName() + " invalid lengths in map output header: id: " +
+ mapId + " compressed len: " + compressedLength +
+ ", decompressed len: " + decompressedLength);
+ return null;
+ }
+ int forReduce =
+ (int)Integer.parseInt(connection.getHeaderField(FOR_REDUCE_TASK));
+
+ if (forReduce != reduce) {
+ LOG.warn("data for the wrong reduce: " + forReduce +
+ " with compressed len: " + compressedLength +
+ ", decompressed len: " + decompressedLength +
+ " arrived to reduce task " + reduce);
+ return null;
+ }
+ LOG.info("header: " + mapId + ", compressed len: " + compressedLength +
+ ", decompressed len: " + decompressedLength);
+
//We will put a file in memory if it meets certain criteria:
//1. The size of the (decompressed) file should be less than 25% of
// the total inmem fs
//2. There is space available in the inmem fs
- long decompressedLength =
- Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
- long compressedLength =
- Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
-
// Check if this map-output can be saved in-memory
boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength);
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=794637&r1=794636&r2=794637&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Jul 16 11:47:01 2009
@@ -2926,9 +2926,13 @@
* Read the index file to get the information about where
* the map-output for the given reducer is available.
*/
- IndexRecord info =
+ IndexRecord info =
tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
+ //set the custom "from-map-task" http header to the map task from which
+ //the map output data is being transferred
+ response.setHeader(FROM_MAP_TASK, mapId);
+
//set the custom "Raw-Map-Output-Length" http header to
//the raw (decompressed) length
response.setHeader(RAW_MAP_OUTPUT_LENGTH,
@@ -2939,6 +2943,10 @@
response.setHeader(MAP_OUTPUT_LENGTH,
Long.toString(info.partLength));
+ //set the custom "for-reduce-task" http header to the reduce task number
+ //for which this map output is being transferred
+ response.setHeader(FOR_REDUCE_TASK, Integer.toString(reduce));
+
//use the same buffersize as used for reading the data from disk
response.setBufferSize(MAX_BYTES_TO_READ);