You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2010/06/01 01:43:20 UTC
svn commit: r949896 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobTracker.java
Author: tomwhite
Date: Mon May 31 23:43:19 2010
New Revision: 949896
URL: http://svn.apache.org/viewvc?rev=949896&view=rev
Log:
MAPREDUCE-1372. ConcurrentModificationException in JobInProgress. Contributed by Dick King and Amareshwari Sriramadasu.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=949896&r1=949895&r2=949896&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon May 31 23:43:19 2010
@@ -1620,3 +1620,6 @@ Release 0.21.0 - Unreleased
MAPREDUCE-1276. Correct flaws in the shuffle related to connection setup
and failure attribution. (Amareshwari Sriramadasu via cdouglas)
+
+ MAPREDUCE-1372. ConcurrentModificationException in JobInProgress.
+ (Dick King and Amareshwari Sriramadasu via tomwhite)
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=949896&r1=949895&r2=949896&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon May 31 23:43:19 2010
@@ -164,7 +164,15 @@ public class JobTracker implements MRCon
private DNSToSwitchMapping dnsToSwitchMapping;
NetworkTopology clusterMap = new NetworkTopology();
private int numTaskCacheLevels; // the max level to which we cache tasks
- private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
+ /**
+ * {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap}
+ * so that it can be safely written to and iterated on via 2 separate threads.
+ * Note: It can only be iterated from a single thread which is feasible since
+ * the only iteration is done in {@link JobInProgress} under the
+ * {@link JobTracker} lock.
+ */
+ private Set<Node> nodesAtMaxLevel =
+ Collections.newSetFromMap(new ConcurrentHashMap<Node, Boolean>());
final TaskScheduler taskScheduler;
private final List<JobInProgressListener> jobInProgressListeners =
new CopyOnWriteArrayList<JobInProgressListener>();
@@ -2256,25 +2264,27 @@ public class JobTracker implements MRCon
}
private Node addHostToNodeMapping(String host, String networkLoc) {
- Node node;
- if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
- node = new NodeBase(host, networkLoc);
- clusterMap.add(node);
- if (node.getLevel() < getNumTaskCacheLevels()) {
- LOG.fatal("Got a host whose level is: " + node.getLevel() + "."
- + " Should get at least a level of value: "
- + getNumTaskCacheLevels());
- try {
- stopTracker();
- } catch (IOException ie) {
- LOG.warn("Exception encountered during shutdown: "
- + StringUtils.stringifyException(ie));
- System.exit(-1);
+ Node node = null;
+ synchronized (nodesAtMaxLevel) {
+ if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
+ node = new NodeBase(host, networkLoc);
+ clusterMap.add(node);
+ if (node.getLevel() < getNumTaskCacheLevels()) {
+ LOG.fatal("Got a host whose level is: " + node.getLevel() + "."
+ + " Should get at least a level of value: "
+ + getNumTaskCacheLevels());
+ try {
+ stopTracker();
+ } catch (IOException ie) {
+ LOG.warn("Exception encountered during shutdown: "
+ + StringUtils.stringifyException(ie));
+ System.exit(-1);
+ }
}
+ hostnameToNodeMap.put(host, node);
+ // Make an entry for the node at the max level in the cache
+ nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
}
- hostnameToNodeMap.put(host, node);
- // Make an entry for the node at the max level in the cache
- nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
}
return node;
}