You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2010/06/01 01:43:20 UTC

svn commit: r949896 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobTracker.java

Author: tomwhite
Date: Mon May 31 23:43:19 2010
New Revision: 949896

URL: http://svn.apache.org/viewvc?rev=949896&view=rev
Log:
MAPREDUCE-1372. ConcurrentModificationException in JobInProgress.  Contributed by Dick King and Amareshwari Sriramadasu.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=949896&r1=949895&r2=949896&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon May 31 23:43:19 2010
@@ -1620,3 +1620,6 @@ Release 0.21.0 - Unreleased
 
     MAPREDUCE-1276. Correct flaws in the shuffle related to connection setup
     and failure attribution. (Amareshwari Sriramadasu via cdouglas)
+
+    MAPREDUCE-1372. ConcurrentModificationException in JobInProgress.
+    (Dick King and Amareshwari Sriramadasu via tomwhite)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=949896&r1=949895&r2=949896&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon May 31 23:43:19 2010
@@ -164,7 +164,15 @@ public class JobTracker implements MRCon
   private DNSToSwitchMapping dnsToSwitchMapping;
   NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
-  private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
+  /**
+   * {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap}
+   * so that it can be safely written to and iterated on via 2 separate threads.
+   * Note: It can only be iterated from a single thread which is feasible since
+   *       the only iteration is done in {@link JobInProgress} under the 
+   *       {@link JobTracker} lock.
+   */
+  private Set<Node> nodesAtMaxLevel = 
+    Collections.newSetFromMap(new ConcurrentHashMap<Node, Boolean>());
   final TaskScheduler taskScheduler;
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
@@ -2256,25 +2264,27 @@ public class JobTracker implements MRCon
   }
   
   private Node addHostToNodeMapping(String host, String networkLoc) {
-    Node node;
-    if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
-      node = new NodeBase(host, networkLoc);
-      clusterMap.add(node);
-      if (node.getLevel() < getNumTaskCacheLevels()) {
-        LOG.fatal("Got a host whose level is: " + node.getLevel() + "." 
-                  + " Should get at least a level of value: " 
-                  + getNumTaskCacheLevels());
-        try {
-          stopTracker();
-        } catch (IOException ie) {
-          LOG.warn("Exception encountered during shutdown: " 
-                   + StringUtils.stringifyException(ie));
-          System.exit(-1);
+    Node node = null;
+    synchronized (nodesAtMaxLevel) {
+      if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
+        node = new NodeBase(host, networkLoc);
+        clusterMap.add(node);
+        if (node.getLevel() < getNumTaskCacheLevels()) {
+          LOG.fatal("Got a host whose level is: " + node.getLevel() + "." 
+              + " Should get at least a level of value: " 
+              + getNumTaskCacheLevels());
+          try {
+            stopTracker();
+          } catch (IOException ie) {
+            LOG.warn("Exception encountered during shutdown: " 
+                + StringUtils.stringifyException(ie));
+            System.exit(-1);
+          }
         }
+        hostnameToNodeMap.put(host, node);
+        // Make an entry for the node at the max level in the cache
+        nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
       }
-      hostnameToNodeMap.put(host, node);
-      // Make an entry for the node at the max level in the cache
-      nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
     }
     return node;
   }