You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:50:20 UTC

svn commit: r1077191 - /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Author: omalley
Date: Fri Mar  4 03:50:20 2011
New Revision: 1077191

URL: http://svn.apache.org/viewvc?rev=1077191&view=rev
Log:
commit d1554546cf72429ab04a0912b879a3a8fd284e49
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date:   Tue Jan 19 16:34:12 2010 +0530

    MAPREDUCE:1372 from https://issues.apache.org/jira/secure/attachment/12430691/M1372-2.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1372. Fixed a ConcurrentModificationException in jobtracker.
    +    (Arun C Murthy via yhemanth)
    +
    +    MAPREDUCE-1316. Fix jobs' retirement from the JobTracker to prevent memory

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077191&r1=1077190&r2=1077191&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:50:20 2011
@@ -177,7 +177,15 @@ public class JobTracker implements MRCon
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
   private int numTaskCacheLevels; // the max level to which we cache tasks
-  private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
+  /**
+   * {@link #nodesAtMaxLevel} is using the keySet from {@link ConcurrentHashMap}
+   * so that it can be safely written to and iterated on via 2 separate threads.
+   * Note: It can only be iterated from a single thread which is feasible since
+   *       the only iteration is done in {@link JobInProgress} under the 
+   *       {@link JobTracker} lock.
+   */
+  private Set<Node> nodesAtMaxLevel = 
+    Collections.newSetFromMap(new ConcurrentHashMap<Node, Boolean>());
   private final TaskScheduler taskScheduler;
   private final List<JobInProgressListener> jobInProgressListeners =
     new CopyOnWriteArrayList<JobInProgressListener>();
@@ -2909,25 +2917,27 @@ public class JobTracker implements MRCon
   }
   
   private Node addHostToNodeMapping(String host, String networkLoc) {
-    Node node;
-    if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
-      node = new NodeBase(host, networkLoc);
-      clusterMap.add(node);
-      if (node.getLevel() < getNumTaskCacheLevels()) {
-        LOG.fatal("Got a host whose level is: " + node.getLevel() + "." 
-                  + " Should get at least a level of value: " 
-                  + getNumTaskCacheLevels());
-        try {
-          stopTracker();
-        } catch (IOException ie) {
-          LOG.warn("Exception encountered during shutdown: " 
-                   + StringUtils.stringifyException(ie));
-          System.exit(-1);
+    Node node = null;
+    synchronized (nodesAtMaxLevel) {
+      if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
+        node = new NodeBase(host, networkLoc);
+        clusterMap.add(node);
+        if (node.getLevel() < getNumTaskCacheLevels()) {
+          LOG.fatal("Got a host whose level is: " + node.getLevel() + "." 
+              + " Should get at least a level of value: " 
+              + getNumTaskCacheLevels());
+          try {
+            stopTracker();
+          } catch (IOException ie) {
+            LOG.warn("Exception encountered during shutdown: " 
+                + StringUtils.stringifyException(ie));
+            System.exit(-1);
+          }
         }
+        hostnameToNodeMap.put(host, node);
+        // Make an entry for the node at the max level in the cache
+        nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
       }
-      hostnameToNodeMap.put(host, node);
-      // Make an entry for the node at the max level in the cache
-      nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
     }
     return node;
   }