You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2013/04/01 18:47:42 UTC
svn commit: r1463203 [3/4] - in
/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project: ./ bin/ conf/
hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop...
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java Mon Apr 1 16:47:16 2013
@@ -18,10 +18,9 @@
package org.apache.hadoop.mapreduce.jobhistory;
-import java.io.IOException;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
@@ -35,7 +34,17 @@ import org.apache.avro.util.Utf8;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class TaskFailedEvent implements HistoryEvent {
- private TaskFailed datum = new TaskFailed();
+ private TaskFailed datum = null;
+
+ private TaskAttemptID failedDueToAttempt;
+ private TaskID id;
+ private TaskType taskType;
+ private long finishTime;
+ private String status;
+ private String error;
+ private Counters counters;
+
+ private static final Counters EMPTY_COUNTERS = new Counters();
/**
* Create an event to record task failure
@@ -45,45 +54,87 @@ public class TaskFailedEvent implements
* @param error Error String
* @param status Status
* @param failedDueToAttempt The attempt id due to which the task failed
+ * @param counters Counters for the task
*/
public TaskFailedEvent(TaskID id, long finishTime,
TaskType taskType, String error, String status,
- TaskAttemptID failedDueToAttempt) {
- datum.taskid = new Utf8(id.toString());
- datum.error = new Utf8(error);
- datum.finishTime = finishTime;
- datum.taskType = new Utf8(taskType.name());
- datum.failedDueToAttempt = failedDueToAttempt == null
- ? null
- : new Utf8(failedDueToAttempt.toString());
- datum.status = new Utf8(status);
+ TaskAttemptID failedDueToAttempt, Counters counters) {
+ this.id = id;
+ this.finishTime = finishTime;
+ this.taskType = taskType;
+ this.error = error;
+ this.status = status;
+ this.failedDueToAttempt = failedDueToAttempt;
+ this.counters = counters;
}
+ public TaskFailedEvent(TaskID id, long finishTime,
+ TaskType taskType, String error, String status,
+ TaskAttemptID failedDueToAttempt) {
+ this(id, finishTime, taskType, error, status,
+ failedDueToAttempt, EMPTY_COUNTERS);
+ }
+
TaskFailedEvent() {}
- public Object getDatum() { return datum; }
- public void setDatum(Object datum) { this.datum = (TaskFailed)datum; }
+ public Object getDatum() {
+ if(datum == null) {
+ datum = new TaskFailed();
+ datum.taskid = new Utf8(id.toString());
+ datum.error = new Utf8(error);
+ datum.finishTime = finishTime;
+ datum.taskType = new Utf8(taskType.name());
+ datum.failedDueToAttempt =
+ failedDueToAttempt == null
+ ? null
+ : new Utf8(failedDueToAttempt.toString());
+ datum.status = new Utf8(status);
+ datum.counters = EventWriter.toAvro(counters);
+ }
+ return datum;
+ }
+
+ public void setDatum(Object odatum) {
+ this.datum = (TaskFailed)odatum;
+ this.id =
+ TaskID.forName(datum.taskid.toString());
+ this.taskType =
+ TaskType.valueOf(datum.taskType.toString());
+ this.finishTime = datum.finishTime;
+ this.error = datum.error.toString();
+ this.failedDueToAttempt =
+ datum.failedDueToAttempt == null
+ ? null
+ : TaskAttemptID.forName(
+ datum.failedDueToAttempt.toString());
+ this.status = datum.status.toString();
+ this.counters =
+ EventReader.fromAvro(datum.counters);
+ }
/** Get the task id */
- public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
+ public TaskID getTaskId() { return id; }
/** Get the error string */
- public String getError() { return datum.error.toString(); }
+ public String getError() { return error; }
/** Get the finish time of the attempt */
- public long getFinishTime() { return datum.finishTime; }
+ public long getFinishTime() {
+ return finishTime;
+ }
/** Get the task type */
public TaskType getTaskType() {
- return TaskType.valueOf(datum.taskType.toString());
+ return taskType;
}
/** Get the attempt id due to which the task failed */
public TaskAttemptID getFailedAttemptID() {
- return datum.failedDueToAttempt == null
- ? null
- : TaskAttemptID.forName(datum.failedDueToAttempt.toString());
+ return failedDueToAttempt;
}
/** Get the task status */
- public String getTaskStatus() { return datum.status.toString(); }
+ public String getTaskStatus() { return status; }
+ /** Get task counters */
+ public Counters getCounters() { return counters; }
/** Get the event type */
- public EventType getEventType() { return EventType.TASK_FAILED; }
+ public EventType getEventType() {
+ return EventType.TASK_FAILED;
+ }
-
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Mon Apr 1 16:47:16 2013
@@ -49,6 +49,8 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.NetworkTopology;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in
* {@link InputFormat#getSplits(JobContext)} method.
@@ -76,7 +78,7 @@ import org.apache.hadoop.net.NetworkTopo
@InterfaceStability.Stable
public abstract class CombineFileInputFormat<K, V>
extends FileInputFormat<K, V> {
-
+
public static final String SPLIT_MINSIZE_PERNODE =
"mapreduce.input.fileinputformat.split.minsize.per.node";
public static final String SPLIT_MINSIZE_PERRACK =
@@ -163,7 +165,6 @@ public abstract class CombineFileInputFo
@Override
public List<InputSplit> getSplits(JobContext job)
throws IOException {
-
long minSizeNode = 0;
long minSizeRack = 0;
long maxSize = 0;
@@ -286,56 +287,100 @@ public abstract class CombineFileInputFo
rackToNodes, maxSize);
totLength += files[i].getLength();
}
+ createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,
+ maxSize, minSizeNode, minSizeRack, splits);
+ }
+ @VisibleForTesting
+ void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+ HashMap<OneBlockInfo, String[]> blockToNodes,
+ HashMap<String, List<OneBlockInfo>> rackToBlocks,
+ long totLength,
+ long maxSize,
+ long minSizeNode,
+ long minSizeRack,
+ List<InputSplit> splits
+ ) {
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
+
+ int numNodes = nodeToBlocks.size();
+ long totalLength = totLength;
- // process all nodes and create splits that are local
- // to a node.
- for (Iterator<Map.Entry<String,
- List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
- iter.hasNext();) {
-
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- nodes.add(one.getKey());
- List<OneBlockInfo> blocksInNode = one.getValue();
-
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- for (OneBlockInfo oneblock : blocksInNode) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(splits, nodes, validBlocks);
- curSplitSize = 0;
- validBlocks.clear();
+ while(true) {
+ // it is allowed for maxSize to be 0. Disable smoothing load for such cases
+ int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
+ ((int) (totalLength/maxSize))/numNodes
+ : Integer.MAX_VALUE;
+ int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
+ numNodes = 0;
+
+ // process all nodes and create splits that are local to a node.
+ for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+ .entrySet().iterator(); iter.hasNext();) {
+ Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+ nodes.add(one.getKey());
+ List<OneBlockInfo> blocksInNode = one.getValue();
+
+ // for each block, copy it into validBlocks. Delete it from
+ // blockToNodes so that the same block does not appear in
+ // two different splits.
+ int splitsInNode = 0;
+ for (OneBlockInfo oneblock : blocksInNode) {
+ if (blockToNodes.containsKey(oneblock)) {
+ validBlocks.add(oneblock);
+ blockToNodes.remove(oneblock);
+ curSplitSize += oneblock.length;
+
+ // if the accumulated split size exceeds the maximum, then
+ // create this split.
+ if (maxSize != 0 && curSplitSize >= maxSize) {
+ // create an input split and add it to the splits array
+ addCreatedSplit(splits, nodes, validBlocks);
+ totalLength -= curSplitSize;
+ curSplitSize = 0;
+ validBlocks.clear();
+ splitsInNode++;
+ if (splitsInNode == maxSplitsByNodeOnly) {
+ // stop grouping on a node so as not to create
+ // disproportionately more splits on a node because it happens
+ // to have many blocks
+ // consider only these nodes in next round of grouping because
+ // they have leftover blocks that may need to be grouped
+ numNodes++;
+ break;
+ }
+ }
}
}
- }
- // if there were any blocks left over and their combined size is
- // larger than minSplitNode, then combine them into one split.
- // Otherwise add them back to the unprocessed pool. It is likely
- // that they will be combined with other blocks from the
- // same rack later on.
- if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
- // create an input split and add it to the splits array
- addCreatedSplit(splits, nodes, validBlocks);
- } else {
- for (OneBlockInfo oneblock : validBlocks) {
- blockToNodes.put(oneblock, oneblock.hosts);
+ // if there were any blocks left over and their combined size is
+ // larger than minSplitNode, then combine them into one split.
+ // Otherwise add them back to the unprocessed pool. It is likely
+ // that they will be combined with other blocks from the
+ // same rack later on.
+ if (minSizeNode != 0 && curSplitSize >= minSizeNode
+ && splitsInNode == 0) {
+ // haven't created any split on this machine. so its ok to add a
+ // smaller
+ // one for parallelism. Otherwise group it in the rack for balanced
+ // size
+ // create an input split and add it to the splits array
+ addCreatedSplit(splits, nodes, validBlocks);
+ totalLength -= curSplitSize;
+ } else {
+ for (OneBlockInfo oneblock : validBlocks) {
+ blockToNodes.put(oneblock, oneblock.hosts);
+ }
}
+ validBlocks.clear();
+ nodes.clear();
+ curSplitSize = 0;
+ }
+
+ if(!(numNodes>0 && totalLength>0)) {
+ break;
}
- validBlocks.clear();
- nodes.clear();
- curSplitSize = 0;
}
// if blocks in a rack are below the specified minimum size, then keep them
@@ -458,7 +503,6 @@ public abstract class CombineFileInputFo
offset[i] = validBlocks.get(i).offset;
length[i] = validBlocks.get(i).length;
}
-
// add this split to the list that is returned
CombineFileSplit thissplit = new CombineFileSplit(fl, offset,
length, locations.toArray(new String[0]));
@@ -474,7 +518,8 @@ public abstract class CombineFileInputFo
/**
* information about one file from the File System
*/
- private static class OneFileInfo {
+ @VisibleForTesting
+ static class OneFileInfo {
private long fileSize; // size of the file
private OneBlockInfo[] blocks; // all blocks in this file
@@ -545,45 +590,55 @@ public abstract class CombineFileInputFo
}
blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
}
+
+ populateBlockInfo(blocks, rackToBlocks, blockToNodes,
+ nodeToBlocks, rackToNodes);
+ }
+ }
+
+ @VisibleForTesting
+ static void populateBlockInfo(OneBlockInfo[] blocks,
+ HashMap<String, List<OneBlockInfo>> rackToBlocks,
+ HashMap<OneBlockInfo, String[]> blockToNodes,
+ HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+ HashMap<String, Set<String>> rackToNodes) {
+ for (OneBlockInfo oneblock : blocks) {
+ // add this block to the block --> node locations map
+ blockToNodes.put(oneblock, oneblock.hosts);
+
+ // For blocks that do not have host/rack information,
+ // assign to default rack.
+ String[] racks = null;
+ if (oneblock.hosts.length == 0) {
+ racks = new String[]{NetworkTopology.DEFAULT_RACK};
+ } else {
+ racks = oneblock.racks;
+ }
- for (OneBlockInfo oneblock : blocks) {
- // add this block to the block --> node locations map
- blockToNodes.put(oneblock, oneblock.hosts);
-
- // For blocks that do not have host/rack information,
- // assign to default rack.
- String[] racks = null;
- if (oneblock.hosts.length == 0) {
- racks = new String[]{NetworkTopology.DEFAULT_RACK};
- } else {
- racks = oneblock.racks;
+ // add this block to the rack --> block map
+ for (int j = 0; j < racks.length; j++) {
+ String rack = racks[j];
+ List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ rackToBlocks.put(rack, blklist);
}
-
- // add this block to the rack --> block map
- for (int j = 0; j < racks.length; j++) {
- String rack = racks[j];
- List<OneBlockInfo> blklist = rackToBlocks.get(rack);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- rackToBlocks.put(rack, blklist);
- }
- blklist.add(oneblock);
- if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
- // Add this host to rackToNodes map
- addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
- }
+ blklist.add(oneblock);
+ if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+ // Add this host to rackToNodes map
+ addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
}
+ }
- // add this block to the node --> block map
- for (int j = 0; j < oneblock.hosts.length; j++) {
- String node = oneblock.hosts[j];
- List<OneBlockInfo> blklist = nodeToBlocks.get(node);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- nodeToBlocks.put(node, blklist);
- }
- blklist.add(oneblock);
+ // add this block to the node --> block map
+ for (int j = 0; j < oneblock.hosts.length; j++) {
+ String node = oneblock.hosts[j];
+ List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+ if (blklist == null) {
+ blklist = new ArrayList<OneBlockInfo>();
+ nodeToBlocks.put(node, blklist);
}
+ blklist.add(oneblock);
}
}
}
@@ -600,7 +655,8 @@ public abstract class CombineFileInputFo
/**
* information about one block from the File System
*/
- private static class OneBlockInfo {
+ @VisibleForTesting
+ static class OneBlockInfo {
Path onepath; // name of this file
long offset; // offset in file
long length; // length of this block
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/security/TokenCache.java Mon Apr 1 16:47:16 2013
@@ -154,7 +154,8 @@ public class TokenCache {
*/
@InterfaceAudience.Private
public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
- private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
+ private static final Text JOB_TOKEN = new Text("JobToken");
+ private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
/**
* load job token from a file
@@ -194,4 +195,14 @@ public class TokenCache {
public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
}
+
+ @InterfaceAudience.Private
+ public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
+ credentials.addSecretKey(SHUFFLE_TOKEN, key);
+ }
+
+ @InterfaceAudience.Private
+ public static byte[] getShuffleSecretKey(Credentials credentials) {
+ return getSecretKey(credentials, SHUFFLE_TOKEN);
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Mon Apr 1 16:47:16 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.spli
import java.io.IOException;
import java.util.Arrays;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -29,9 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.MRJobConfig;
/**
* A utility that reads the split meta info and creates
@@ -44,8 +44,8 @@ public class SplitMetaInfoReader {
public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir)
throws IOException {
- long maxMetaInfoSize = conf.getLong(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
- 10000000L);
+ long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
+ MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
FileStatus fStatus = fs.getFileStatus(metaSplitFile);
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Mon Apr 1 16:47:16 2013
@@ -82,7 +82,7 @@ class Fetcher<K,V> extends Thread {
private final int connectionTimeout;
private final int readTimeout;
- private final SecretKey jobTokenSecret;
+ private final SecretKey shuffleSecretKey;
private volatile boolean stopped = false;
@@ -92,7 +92,7 @@ class Fetcher<K,V> extends Thread {
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
- ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
+ ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@@ -100,7 +100,7 @@ class Fetcher<K,V> extends Thread {
this.exceptionReporter = exceptionReporter;
this.id = ++nextId;
this.reduce = reduceId.getTaskID().getId();
- this.jobTokenSecret = jobTokenSecret;
+ this.shuffleSecretKey = shuffleKey;
ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
@@ -221,7 +221,6 @@ class Fetcher<K,V> extends Thread {
// Construct the url and connect
DataInputStream input;
- boolean connectSucceeded = false;
try {
URL url = getMapOutputURL(host, maps);
@@ -229,7 +228,8 @@ class Fetcher<K,V> extends Thread {
// generate hash of the url
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
- String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
+ String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
+ shuffleSecretKey);
// put url hash into http header
connection.addRequestProperty(
@@ -237,7 +237,6 @@ class Fetcher<K,V> extends Thread {
// set the read timeout
connection.setReadTimeout(readTimeout);
connect(connection, connectionTimeout);
- connectSucceeded = true;
input = new DataInputStream(connection.getInputStream());
// Validate response code
@@ -255,7 +254,7 @@ class Fetcher<K,V> extends Thread {
}
LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
// verify that replyHash is HMac of encHash
- SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
+ SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecretKey);
LOG.info("for url="+msgToEncode+" sent hash and received reply");
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
@@ -265,18 +264,10 @@ class Fetcher<K,V> extends Thread {
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
- if (!connectSucceeded) {
- for(TaskAttemptID left: remaining) {
- scheduler.copyFailed(left, host, connectSucceeded, connectExcpt);
- }
- } else {
- // If we got a read error at this stage, it implies there was a problem
- // with the first map, typically lost map. So, penalize only that map
- // and add the rest
- TaskAttemptID firstMap = maps.get(0);
- scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt);
+ for(TaskAttemptID left: remaining) {
+ scheduler.copyFailed(left, host, false, connectExcpt);
}
-
+
// Add back all the remaining maps, WITHOUT marking them as failed
for(TaskAttemptID left: remaining) {
scheduler.putBackKnownMapOutput(host, left);
@@ -366,13 +357,20 @@ class Fetcher<K,V> extends Thread {
return EMPTY_ATTEMPT_ID_ARRAY;
}
- // Go!
- LOG.info("fetcher#" + id + " about to shuffle output of map " +
- mapOutput.getMapId() + " decomp: " +
- decompressedLength + " len: " + compressedLength + " to " +
- mapOutput.getDescription());
- mapOutput.shuffle(host, input, compressedLength, decompressedLength,
- metrics, reporter);
+ // The codec for lz0,lz4,snappy,bz2,etc. throw java.lang.InternalError
+ // on decompression failures. Catching and re-throwing as IOException
+ // to allow fetch failure logic to be processed
+ try {
+ // Go!
+ LOG.info("fetcher#" + id + " about to shuffle output of map "
+ + mapOutput.getMapId() + " decomp: " + decompressedLength
+ + " len: " + compressedLength + " to " + mapOutput.getDescription());
+ mapOutput.shuffle(host, input, compressedLength, decompressedLength,
+ metrics, reporter);
+ } catch (java.lang.InternalError e) {
+ LOG.warn("Failed to shuffle for fetcher#"+id, e);
+ throw new IOException(e);
+ }
// Inform the shuffle scheduler
long endTime = System.currentTimeMillis();
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Mon Apr 1 16:47:16 2013
@@ -475,9 +475,9 @@ public class MergeManagerImpl<K, V> impl
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
- compressAwarePath = new CompressAwarePath(outputPath,
- writer.getRawLength());
writer.close();
+ compressAwarePath = new CompressAwarePath(outputPath,
+ writer.getRawLength(), writer.getCompressedLength());
LOG.info(reduceId +
" Merge of the " + noInMemorySegments +
@@ -500,7 +500,7 @@ public class MergeManagerImpl<K, V> impl
private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
public OnDiskMerger(MergeManagerImpl<K, V> manager) {
- super(manager, Integer.MAX_VALUE, exceptionReporter);
+ super(manager, ioSortFactor, exceptionReporter);
setName("OnDiskMerger - Thread to merge on-disk map-outputs");
setDaemon(true);
}
@@ -552,9 +552,9 @@ public class MergeManagerImpl<K, V> impl
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
- compressAwarePath = new CompressAwarePath(outputPath,
- writer.getRawLength());
writer.close();
+ compressAwarePath = new CompressAwarePath(outputPath,
+ writer.getRawLength(), writer.getCompressedLength());
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
@@ -713,13 +713,15 @@ public class MergeManagerImpl<K, V> impl
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
mergePhase);
- final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
+ Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
keyClass, valueClass, codec, null);
try {
Merger.writeFile(rIter, writer, reporter, job);
- // add to list of final disk outputs.
+ writer.close();
onDiskMapOutputs.add(new CompressAwarePath(outputPath,
- writer.getRawLength()));
+ writer.getRawLength(), writer.getCompressedLength()));
+ writer = null;
+ // add to list of final disk outputs.
} catch (IOException e) {
if (null != outputPath) {
try {
@@ -789,7 +791,7 @@ public class MergeManagerImpl<K, V> impl
// merges. See comment where mergePhaseFinished is being set
Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
RawKeyValueIterator diskMerge = Merger.merge(
- job, fs, keyClass, valueClass, diskSegments,
+ job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null, thisPhase);
diskSegments.clear();
@@ -808,24 +810,45 @@ public class MergeManagerImpl<K, V> impl
static class CompressAwarePath extends Path {
private long rawDataLength;
+ private long compressedSize;
- public CompressAwarePath(Path path, long rawDataLength) {
+ public CompressAwarePath(Path path, long rawDataLength, long compressSize) {
super(path.toUri());
this.rawDataLength = rawDataLength;
+ this.compressedSize = compressSize;
}
public long getRawDataLength() {
return rawDataLength;
}
-
+
+ public long getCompressedSize() {
+ return compressedSize;
+ }
+
@Override
public boolean equals(Object other) {
return super.equals(other);
}
-
+
@Override
public int hashCode() {
return super.hashCode();
}
+
+ @Override
+ public int compareTo(Object obj) {
+ if(obj instanceof CompressAwarePath) {
+ CompressAwarePath compPath = (CompressAwarePath) obj;
+ if(this.compressedSize < compPath.getCompressedSize()) {
+ return -1;
+ } else if (this.getCompressedSize() > compPath.getCompressedSize()) {
+ return 1;
+ }
+ // Not returning 0 here so that objects with the same size (but
+ // different paths) are still added to the TreeSet.
+ }
+ return super.compareTo(obj);
+ }
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Mon Apr 1 16:47:16 2013
@@ -48,6 +48,7 @@ class OnDiskMapOutput<K, V> extends MapO
private final Path outputPath;
private final MergeManagerImpl<K, V> merger;
private final OutputStream disk;
+ private long compressedSize;
public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
MergeManagerImpl<K, V> merger, long size,
@@ -108,13 +109,14 @@ class OnDiskMapOutput<K, V> extends MapO
bytesLeft + " bytes missing of " +
compressedLength + ")");
}
+ this.compressedSize = compressedLength;
}
@Override
public void commit() throws IOException {
localFS.rename(tmpOutputPath, outputPath);
CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
- getSize());
+ getSize(), this.compressedSize);
merger.closeOnDiskFile(compressAwarePath);
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java Mon Apr 1 16:47:16 2013
@@ -108,7 +108,7 @@ public class Shuffle<K, V> implements Sh
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
- reduceTask.getJobTokenSecret());
+ reduceTask.getShuffleSecret());
fetchers[i].start();
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Mon Apr 1 16:47:16 2013
@@ -521,6 +521,8 @@ public class ConfigUtil {
});
Configuration.addDeprecation("mapreduce.user.classpath.first",
MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST);
+ Configuration.addDeprecation(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
+ MRJobConfig.SPLIT_METAINFO_MAXSIZE);
}
public static void main(String[] args) {
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Apr 1 16:47:16 2013
@@ -296,6 +296,14 @@
</property>
<property>
+ <name>mapreduce.shuffle.max.connections</name>
+ <value>0</value>
+ <description>Max allowed connections for the shuffle. Set to 0 (zero)
+ to indicate no limit on the number of connections.
+ </description>
+</property>
+
+<property>
<name>mapreduce.reduce.markreset.buffer.percent</name>
<value>0.0</value>
<description>The percentage of memory -relative to the maximum heap size- to
@@ -798,6 +806,14 @@
</description>
</property>
+<property>
+ <name>mapreduce.am.max-attempts</name>
+ <value>1</value>
+ <description>The maximum number of application attempts. It is a
+ application-specific setting. It should not be larger than the global number
+ set by resourcemanager. Otherwise, it will be override.</description>
+</property>
+
<!-- Job Notification Configuration -->
<property>
<name>mapreduce.job.end-notification.url</name>
Propchange: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1446831-1462625
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java Mon Apr 1 16:47:16 2013
@@ -25,7 +25,9 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.HttpURLConnection;
+import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.ArrayList;
@@ -71,6 +73,54 @@ public class TestFetcher {
}
@SuppressWarnings("unchecked")
+ @Test(timeout=30000)
+ public void testCopyFromHostConnectionTimeout() throws Exception {
+ LOG.info("testCopyFromHostConnectionTimeout");
+ JobConf job = new JobConf();
+ TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+ ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+ MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
+ Reporter r = mock(Reporter.class);
+ ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+ ExceptionReporter except = mock(ExceptionReporter.class);
+ SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+ HttpURLConnection connection = mock(HttpURLConnection.class);
+ when(connection.getInputStream()).thenThrow(
+ new SocketTimeoutException("This is a fake timeout :)"));
+
+ Counters.Counter allErrs = mock(Counters.Counter.class);
+ when(r.getCounter(anyString(), anyString()))
+ .thenReturn(allErrs);
+
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+ r, metrics, except, key, connection);
+
+ MapHost host = new MapHost("localhost", "http://localhost:8080/");
+
+ ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+ TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+ maps.add(map1ID);
+ TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+ maps.add(map2ID);
+ when(ss.getMapsForHost(host)).thenReturn(maps);
+
+ String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+
+ underTest.copyFromHost(host);
+
+ verify(connection)
+ .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
+ encHash);
+
+ verify(allErrs).increment(1);
+ verify(ss).copyFailed(map1ID, host, false, false);
+ verify(ss).copyFailed(map2ID, host, false, false);
+
+ verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
+ verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
+ }
+
+ @SuppressWarnings("unchecked")
@Test
public void testCopyFromHostBogusHeader() throws Exception {
LOG.info("testCopyFromHostBogusHeader");
@@ -184,4 +234,62 @@ public class TestFetcher {
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
}
-}
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testCopyFromHostCompressFailure() throws Exception {
+ LOG.info("testCopyFromHostCompressFailure");
+ JobConf job = new JobConf();
+ TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
+ ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
+ MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
+ InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
+ Reporter r = mock(Reporter.class);
+ ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+ ExceptionReporter except = mock(ExceptionReporter.class);
+ SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
+ HttpURLConnection connection = mock(HttpURLConnection.class);
+
+ Counters.Counter allErrs = mock(Counters.Counter.class);
+ when(r.getCounter(anyString(), anyString()))
+ .thenReturn(allErrs);
+
+ Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
+ r, metrics, except, key, connection);
+
+
+ MapHost host = new MapHost("localhost", "http://localhost:8080/");
+
+ ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
+ TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
+ maps.add(map1ID);
+ TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
+ maps.add(map2ID);
+ when(ss.getMapsForHost(host)).thenReturn(maps);
+
+ String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
+ String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
+
+ when(connection.getResponseCode()).thenReturn(200);
+ when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
+ .thenReturn(replyHash);
+ ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ header.write(new DataOutputStream(bout));
+ ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
+ when(connection.getInputStream()).thenReturn(in);
+ when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
+ .thenReturn(immo);
+
+ doThrow(new java.lang.InternalError())
+ .when(immo)
+ .shuffle(any(MapHost.class), any(InputStream.class), anyLong(),
+ anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
+
+ underTest.copyFromHost(host);
+
+ verify(connection)
+ .addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
+ encHash);
+ verify(ss, times(1)).copyFailed(map1ID, host, true, false);
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Mon Apr 1 16:47:16 2013
@@ -17,28 +17,38 @@
*/
package org.apache.hadoop.mapreduce.task.reduce;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
public class TestMergeManager {
@Test(timeout=10000)
- @SuppressWarnings("unchecked")
public void testMemoryMerge() throws Exception {
final int TOTAL_MEM_BYTES = 10000;
final int OUTPUT_SIZE = 7950;
@@ -195,4 +205,59 @@ public class TestMergeManager {
return exceptions.size();
}
}
+
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ @Test(timeout=10000)
+ public void testOnDiskMerger() throws IOException, URISyntaxException,
+ InterruptedException {
+ JobConf jobConf = new JobConf();
+ final int SORT_FACTOR = 5;
+ jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
+
+ MapOutputFile mapOutputFile = new MROutputFiles();
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ MergeManagerImpl<IntWritable, IntWritable> manager =
+ new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
+ , null, null, null, null, null, null, null, null, null, mapOutputFile);
+
+ MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
+ onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
+ IntWritable, IntWritable>) Whitebox.getInternalState(manager,
+ "onDiskMerger");
+ int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
+ "mergeFactor");
+
+ // make sure the io.sort.factor is set properly
+ assertEquals(mergeFactor, SORT_FACTOR);
+
+ // Stop the onDiskMerger thread so that we can intercept the list of files
+ // waiting to be merged.
+ onDiskMerger.suspend();
+
+ //Send the list of fake files waiting to be merged
+ Random rand = new Random();
+ for(int i = 0; i < 2*SORT_FACTOR; ++i) {
+ Path path = new Path("somePath");
+ CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
+ manager.closeOnDiskFile(cap);
+ }
+
+ //Check that the files pending to be merged are in sorted order.
+ LinkedList<List<CompressAwarePath>> pendingToBeMerged =
+ (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
+ onDiskMerger, "pendingToBeMerged");
+ assertTrue("No inputs were added to list pending to merge",
+ pendingToBeMerged.size() > 0);
+ for(int i = 0; i < pendingToBeMerged.size(); ++i) {
+ List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
+ for(int j = 1; j < inputs.size(); ++j) {
+ assertTrue("Not enough / too many inputs were going to be merged",
+ inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
+ assertTrue("Inputs to be merged were not sorted according to size: ",
+ inputs.get(j).getCompressedSize()
+ >= inputs.get(j-1).getCompressedSize());
+ }
+ }
+
+ }
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Mon Apr 1 16:47:16 2013
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -107,6 +108,11 @@ public class CompletedTaskAttempt implem
}
@Override
+ public Phase getPhase() {
+ return Phase.CLEANUP;
+ }
+
+ @Override
public TaskAttemptState getState() {
return state;
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java Mon Apr 1 16:47:16 2013
@@ -65,8 +65,12 @@ public class HsTasksBlock extends HtmlBl
if (!symbol.isEmpty()) {
type = MRApps.taskType(symbol);
}
-
- THEAD<TABLE<Hamlet>> thead = html.table("#tasks").thead();
+ THEAD<TABLE<Hamlet>> thead;
+ if(type != null)
+ thead = html.table("#"+app.getJob().getID()
+ + type).$class("dt-tasks").thead();
+ else
+ thead = html.table("#tasks").thead();
//Create the spanning row
int attemptColSpan = type == TaskType.REDUCE ? 8 : 3;
thead.tr().
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java Mon Apr 1 16:47:16 2013
@@ -22,7 +22,9 @@ import static org.apache.hadoop.mapreduc
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.ACCORDION;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.DATATABLES_SELECTOR;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.initSelector;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.postInitID;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.tableInit;
@@ -42,6 +44,8 @@ public class HsTasksPage extends HsView
@Override protected void preHead(Page.HTML<_> html) {
commonPreHead(html);
set(DATATABLES_ID, "tasks");
+ set(DATATABLES_SELECTOR, ".dt-tasks" );
+ set(initSelector(DATATABLES), tasksTableInit());
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
set(initID(DATATABLES, "tasks"), tasksTableInit());
set(postInitID(DATATABLES, "tasks"), jobsPostTableInit());
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java Mon Apr 1 16:47:16 2013
@@ -77,13 +77,18 @@ public class MockHistoryJobs extends Moc
for(Map.Entry<JobId, Job> entry: mocked.entrySet()) {
JobId id = entry.getKey();
Job j = entry.getValue();
- ret.full.put(id, new MockCompletedJob(j));
- JobReport report = j.getReport();
+ MockCompletedJob mockJob = new MockCompletedJob(j);
+ // use MockCompletedJob to set everything below to make sure
+ // consistent with what history server would do
+ ret.full.put(id, mockJob);
+ JobReport report = mockJob.getReport();
JobIndexInfo info = new JobIndexInfo(report.getStartTime(),
- report.getFinishTime(), j.getUserName(), j.getName(), id,
- j.getCompletedMaps(), j.getCompletedReduces(), String.valueOf(j.getState()));
- info.setQueueName(j.getQueueName());
+ report.getFinishTime(), mockJob.getUserName(), mockJob.getName(), id,
+ mockJob.getCompletedMaps(), mockJob.getCompletedReduces(),
+ String.valueOf(mockJob.getState()));
+ info.setQueueName(mockJob.getQueueName());
ret.partial.put(id, new PartialJob(info, id));
+
}
return ret;
}
@@ -99,12 +104,16 @@ public class MockHistoryJobs extends Moc
@Override
public int getCompletedMaps() {
- return job.getCompletedMaps();
+ // we always return total since this is history server
+ // and PartialJob also assumes completed - total
+ return job.getTotalMaps();
}
@Override
public int getCompletedReduces() {
- return job.getCompletedReduces();
+ // we always return total since this is history server
+ // and PartialJob also assumes completed - total
+ return job.getTotalReduces();
}
@Override
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Mon Apr 1 16:47:16 2013
@@ -79,7 +79,7 @@ public class TestJobHistoryEntities {
}
/* Verify some expected values based on the history file */
- @Test
+ @Test (timeout=10000)
public void testCompletedJob() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
@@ -104,7 +104,7 @@ public class TestJobHistoryEntities {
assertEquals(JobState.SUCCEEDED, jobReport.getJobState());
}
- @Test
+ @Test (timeout=10000)
public void testCompletedTask() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
@@ -133,7 +133,7 @@ public class TestJobHistoryEntities {
assertEquals(rt1Id, rt1Report.getTaskId());
}
- @Test
+ @Test (timeout=10000)
public void testCompletedTaskAttempt() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java Mon Apr 1 16:47:16 2013
@@ -25,7 +25,6 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
@@ -67,8 +66,17 @@ public class TestJobHistoryEvents {
* completed maps
*/
HistoryContext context = new JobHistory();
+ // test start and stop states
((JobHistory)context).init(conf);
- Job parsedJob = context.getJob(jobId);
+ ((JobHistory)context).start();
+ Assert.assertTrue( context.getStartTime()>0);
+ Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STARTED);
+
+
+ ((JobHistory)context).stop();
+ Assert.assertEquals(((JobHistory)context).getServiceState(),Service.STATE.STOPPED);
+ Job parsedJob = context.getJob(jobId);
+
Assert.assertEquals("CompletedMaps not correct", 2,
parsedJob.getCompletedMaps());
Assert.assertEquals(System.getProperty("user.name"), parsedJob.getUserName());
@@ -177,9 +185,8 @@ public class TestJobHistoryEvents {
@Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
- JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(
- context, getStartCount());
- return eventHandler;
+ return new JobHistoryEventHandler(
+ context, getStartCount());
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Mon Apr 1 16:47:16 2013
@@ -18,7 +18,9 @@
package org.apache.hadoop.mapreduce.v2.hs;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -40,6 +42,7 @@ import org.apache.hadoop.mapreduce.TaskI
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.EventReader;
import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -60,7 +63,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
@@ -78,21 +80,27 @@ public class TestJobHistoryParsing {
private static final String RACK_NAME = "/MyRackName";
+ private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+
public static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
return Arrays.asList(new String[]{RACK_NAME});
}
+
+ @Override
+ public void reloadCachedMappings() {
+ }
}
- @Test
+ @Test (timeout=50000)
public void testJobInfo() throws Exception {
JobInfo info = new JobInfo();
Assert.assertEquals("NORMAL", info.getPriority());
info.printAll();
}
- @Test
+ @Test (timeout=50000)
public void testHistoryParsing() throws Exception {
LOG.info("STARTING testHistoryParsing()");
try {
@@ -102,7 +110,7 @@ public class TestJobHistoryParsing {
}
}
- @Test
+ @Test (timeout=50000)
public void testHistoryParsingWithParseErrors() throws Exception {
LOG.info("STARTING testHistoryParsingWithParseErrors()");
try {
@@ -317,18 +325,37 @@ public class TestJobHistoryParsing {
}
}
}
+
+ // test output for HistoryViewer
+ PrintStream stdps=System.out;
+ try {
+ System.setOut(new PrintStream(outContent));
+ HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
+ fileInfo.getHistoryFile()).toString(), conf, true);
+ viewer.print();
+
+ for (TaskInfo taskInfo : allTasks.values()) {
+
+ String test= (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID();
+ Assert.assertTrue(outContent.toString().indexOf(test)>0);
+ Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0);
+ }
+ } finally {
+ System.setOut(stdps);
+
+ }
}
-
+
// Computes finished maps similar to RecoveryService...
- private long computeFinishedMaps(JobInfo jobInfo,
- int numMaps, int numSuccessfulMaps) {
+ private long computeFinishedMaps(JobInfo jobInfo, int numMaps,
+ int numSuccessfulMaps) {
if (numMaps == numSuccessfulMaps) {
return jobInfo.getFinishedMaps();
}
-
+
long numFinishedMaps = 0;
- Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos =
- jobInfo.getAllTasks();
+ Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+ .getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
++numFinishedMaps;
@@ -337,7 +364,7 @@ public class TestJobHistoryParsing {
return numFinishedMaps;
}
- @Test
+ @Test (timeout=50000)
public void testHistoryParsingForFailedAttempts() throws Exception {
LOG.info("STARTING testHistoryParsingForFailedAttempts");
try {
@@ -404,7 +431,7 @@ public class TestJobHistoryParsing {
}
}
- @Test
+ @Test (timeout=5000)
public void testCountersForFailedTask() throws Exception {
LOG.info("STARTING testCountersForFailedTask");
try {
@@ -455,13 +482,16 @@ public class TestJobHistoryParsing {
CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
Assert.assertNotNull("completed task report has null counters",
ct.getReport().getCounters());
+ //Make sure all the completedTask has counters, and the counters are not empty
+ Assert.assertTrue(ct.getReport().getCounters()
+ .getAllCounterGroups().size() > 0);
}
} finally {
LOG.info("FINISHED testCountersForFailedTask");
}
}
- @Test
+ @Test (timeout=50000)
public void testScanningOldDirs() throws Exception {
LOG.info("STARTING testScanningOldDirs");
try {
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobs.java Mon Apr 1 16:47:16 2013
@@ -117,6 +117,7 @@ public class TestHsWebServicesJobs exten
fullJobs = jobs.full;
}
+
TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
this(appid, numJobs, numTasks, numAttempts, false);
}
@@ -411,7 +412,8 @@ public class TestHsWebServicesJobs exten
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("job");
- VerifyJobsUtils.verifyHsJob(info, jobsMap.get(id));
+
+ VerifyJobsUtils.verifyHsJob(info, appContext.getJob(id));
}
}
@@ -613,7 +615,7 @@ public class TestHsWebServicesJobs exten
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("jobCounters");
- verifyHsJobCounters(info, jobsMap.get(id));
+ verifyHsJobCounters(info, appContext.getJob(id));
}
}
@@ -631,7 +633,7 @@ public class TestHsWebServicesJobs exten
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("jobCounters");
- verifyHsJobCounters(info, jobsMap.get(id));
+ verifyHsJobCounters(info, appContext.getJob(id));
}
}
@@ -689,7 +691,7 @@ public class TestHsWebServicesJobs exten
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject info = json.getJSONObject("jobCounters");
- verifyHsJobCounters(info, jobsMap.get(id));
+ verifyHsJobCounters(info, appContext.getJob(id));
}
}
@@ -711,7 +713,7 @@ public class TestHsWebServicesJobs exten
is.setCharacterStream(new StringReader(xml));
Document dom = db.parse(is);
NodeList info = dom.getElementsByTagName("jobCounters");
- verifyHsJobCountersXML(info, jobsMap.get(id));
+ verifyHsJobCountersXML(info, appContext.getJob(id));
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/webapp/TestHsWebServicesJobsQuery.java Mon Apr 1 16:47:16 2013
@@ -284,9 +284,9 @@ public class TestHsWebServicesJobsQuery
String type = exception.getString("exception");
String classname = exception.getString("javaClassName");
WebServicesTestUtils
- .checkStringMatch(
+ .checkStringContains(
"exception message",
- "No enum const class org.apache.hadoop.mapreduce.v2.api.records.JobState.InvalidState",
+ "org.apache.hadoop.mapreduce.v2.api.records.JobState.InvalidState",
message);
WebServicesTestUtils.checkStringMatch("exception type",
"IllegalArgumentException", type);
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Mon Apr 1 16:47:16 2013
@@ -106,8 +106,9 @@ public class ResourceMgrDelegate extends
public QueueInfo getQueue(String queueName) throws IOException,
InterruptedException {
- return TypeConverter.fromYarn(
- super.getQueueInfo(queueName), this.conf);
+ org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
+ super.getQueueInfo(queueName);
+ return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo, conf);
}
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Mon Apr 1 16:47:16 2013
@@ -481,6 +481,9 @@ public class YARNRunner implements Clien
appContext.setCancelTokensWhenComplete(
conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
appContext.setAMContainerSpec(amContainer); // AM Container
+ appContext.setMaxAppAttempts(
+ conf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
+ MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
return appContext;
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java Mon Apr 1 16:47:16 2013
@@ -38,6 +38,11 @@ public class MiniMRClientClusterFactory
public static MiniMRClientCluster create(Class<?> caller, int noOfNMs,
Configuration conf) throws IOException {
+ return create(caller, caller.getSimpleName(), noOfNMs, conf);
+ }
+
+ public static MiniMRClientCluster create(Class<?> caller, String identifier,
+ int noOfNMs, Configuration conf) throws IOException {
if (conf == null) {
conf = new Configuration();
@@ -45,7 +50,7 @@ public class MiniMRClientClusterFactory
FileSystem fs = FileSystem.get(conf);
- Path testRootDir = new Path("target", caller.getName() + "-tmpDir")
+ Path testRootDir = new Path("target", identifier + "-tmpDir")
.makeQualified(fs);
Path appJar = new Path(testRootDir, "MRAppJar.jar");
@@ -65,10 +70,10 @@ public class MiniMRClientClusterFactory
fs.setPermission(remoteCallerJar, new FsPermission("744"));
job.addFileToClassPath(remoteCallerJar);
- MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
- .getName(), noOfNMs);
+ MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(identifier,
+ noOfNMs);
job.getConfiguration().set("minimrclientcluster.caller.name",
- caller.getName());
+ identifier);
job.getConfiguration().setInt("minimrclientcluster.nodemanagers.number",
noOfNMs);
miniMRYarnCluster.init(job.getConfiguration());
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRCluster.java Mon Apr 1 16:47:16 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -177,8 +178,10 @@ public class MiniMRCluster {
int numTrackerToExclude, Clock clock) throws IOException {
if (conf == null) conf = new JobConf();
FileSystem.setDefaultUri(conf, namenode);
+ String identifier = this.getClass().getSimpleName() + "_"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE));
mrClientCluster = MiniMRClientClusterFactory.create(this.getClass(),
- numTaskTrackers, conf);
+ identifier, numTaskTrackers, conf);
}
public UserGroupInformation getUgi() {
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java Mon Apr 1 16:47:16 2013
@@ -26,12 +26,12 @@ import org.apache.hadoop.io.compress.Def
import org.apache.hadoop.io.compress.GzipCodec;
import org.junit.Test;
-
+import static org.junit.Assert.*;
public class TestIFile {
@Test
/**
- * Create an IFile.Writer using GzipCodec since this codec does not
+ * Create an IFile.Writer using GzipCodec since this code does not
* have a compressor when run via the tests (ie no native libraries).
*/
public void testIFileWriterWithCodec() throws Exception {
@@ -63,5 +63,11 @@ public class TestIFile {
IFile.Reader<Text, Text> reader =
new IFile.Reader<Text, Text>(conf, rfs, path, codec, null);
reader.close();
+
+ // test check sum
+ byte[] ab= new byte[100];
+ int readed= reader.checksumIn.readWithChecksum(ab, 0, ab.length);
+ assertEquals( readed,reader.checksumIn.getChecksum().length);
+
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java Mon Apr 1 16:47:16 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ClassUtil;
import static org.junit.Assert.*;
@@ -79,7 +80,7 @@ public class TestJobConf {
Class clazz = Class.forName(CLASSNAME, true, cl);
assertNotNull(clazz);
- String containingJar = JobConf.findContainingJar(clazz);
+ String containingJar = ClassUtil.findContainingJar(clazz);
assertEquals(jar.getAbsolutePath(), containingJar);
}
}
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java Mon Apr 1 16:47:16 2013
@@ -61,8 +61,12 @@ import org.apache.hadoop.util.Reflection
*/
public class TestMapProgress extends TestCase {
public static final Log LOG = LogFactory.getLog(TestMapProgress.class);
- private static String TEST_ROOT_DIR = new File(System.getProperty(
- "test.build.data", "/tmp")).getAbsolutePath() + "/mapPahseprogress";
+ private static String TEST_ROOT_DIR;
+ static {
+ String root = new File(System.getProperty("test.build.data", "/tmp"))
+ .getAbsolutePath();
+ TEST_ROOT_DIR = new Path(root, "mapPhaseprogress").toString();
+ }
static class FakeUmbilical implements TaskUmbilicalProtocol {
Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java?rev=1463203&r1=1463202&r2=1463203&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMultiFileSplit.java Mon Apr 1 16:47:16 2013
@@ -21,13 +21,20 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Arrays;
import junit.framework.TestCase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-
+/**
+ *
+ * test MultiFileSplit class
+ */
public class TestMultiFileSplit extends TestCase{
public void testReadWrite() throws Exception {
@@ -58,4 +65,26 @@ public class TestMultiFileSplit extends
assertTrue(Arrays.equals(split.getLengths(), readSplit.getLengths()));
System.out.println(split.toString());
}
+
+ /**
+ * test method getLocations
+ * @throws IOException
+ */
+ public void testgetLocations() throws IOException{
+ JobConf job= new JobConf();
+
+ File tmpFile = File.createTempFile("test","txt");
+ tmpFile.createNewFile();
+ OutputStream out=new FileOutputStream(tmpFile);
+ out.write("tempfile".getBytes());
+ out.flush();
+ out.close();
+ Path[] path= {new Path(tmpFile.getAbsolutePath())};
+ long[] lengths = {100};
+
+ MultiFileSplit split = new MultiFileSplit(job,path,lengths);
+ String [] locations= split.getLocations();
+ assertTrue(locations.length==1);
+ assertEquals(locations[0], "localhost");
+ }
}