You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/05/23 22:50:36 UTC

svn commit: r1126742 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/imp...

Author: mahadev
Date: Mon May 23 20:50:36 2011
New Revision: 1126742

URL: http://svn.apache.org/viewvc?rev=1126742&view=rev
Log:
Fix nodemanager expiry to not throw OOM. (mahadev)

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1126742&r1=1126741&r2=1126742&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 23 20:50:36 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    Fix nodemanager expiry to not throw OOM. (mahadev)
+
     Changed Scheduler to return available limit to AM in the allocate api.
     (acmurthy)
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java?rev=1126742&r1=1126741&r2=1126742&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java Mon May 23 20:50:36 2011
@@ -11,7 +11,6 @@ public interface NodeStatus {
   
   public abstract NodeId getNodeId();
   public abstract int getResponseId();
-  public abstract long getLastSeen();
   
   public abstract Map<String, List<Container>> getAllContainers();
   public abstract List<Container> getContainers(String key);
@@ -21,7 +20,6 @@ public interface NodeStatus {
 
   public abstract void setNodeId(NodeId nodeId);
   public abstract void setResponseId(int responseId);
-  public abstract void setLastSeen(long lastSeen);
   
   public abstract void addAllContainers(Map<String, List<Container>> containers);
   public abstract void setContainers(String key, List<Container> containers);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java?rev=1126742&r1=1126741&r2=1126742&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java Mon May 23 20:50:36 2011
@@ -108,16 +108,6 @@ public class NodeStatusPBImpl extends Pr
     this.nodeId = nodeId;
     
   }
-  @Override
-  public long getLastSeen() {
-    NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
-    return p.getLastSeen();
-  }
-  @Override
-  public void setLastSeen(long lastSeen) {
-    maybeInitBuilder();
-    builder.setLastSeen(lastSeen);
-  }
   
   @Override
   public Map<String, List<Container>> getAllContainers() {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1126742&r1=1126741&r2=1126742&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/proto/yarn_server_common_protos.proto Mon May 23 20:50:36 2011
@@ -14,9 +14,8 @@ message NodeHealthStatusProto {
 message NodeStatusProto {
   optional NodeIdProto node_id = 1;
   optional int32 response_id = 2;
-  optional int64 last_seen = 3;
-  repeated StringContainerListMapProto containers = 4;
-  optional NodeHealthStatusProto nodeHealthStatus = 5;
+  repeated StringContainerListMapProto containers = 3;
+  optional NodeHealthStatusProto nodeHealthStatus = 4;
 }
 
 message RegistrationResponseProto {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java?rev=1126742&r1=1126741&r2=1126742&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/AMTracker.java Mon May 23 20:50:36 2011
@@ -145,7 +145,7 @@ public class AMTracker extends AbstractS
 
       /* the expiry queue does not need to be in sync with applications,
        * if an applications in the expiry queue cannot be found in applications
-       * its alright. We do not want to hold a hold on applications while going
+       * its alright. We do not want to hold a lock on applications while going
        * through the expiry queue.
        */
       List<ApplicationId> expired = new ArrayList<ApplicationId>();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java?rev=1126742&r1=1126741&r2=1126742&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfoTracker.java Mon May 23 20:50:36 2011
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 
@@ -29,24 +30,35 @@ import org.apache.hadoop.yarn.server.res
 public class NodeInfoTracker {
   private final NodeManager node;
   HeartbeatResponse lastHeartBeatResponse;
-  private long lastSeen;
-
+  private NodeHeartbeatStatus heartBeatStatus;
+  
+  class NodeHeartbeatStatus {
+    private long lastSeen = 0;
+   
+    public NodeHeartbeatStatus(long lastSeen) {
+      this.lastSeen = lastSeen;
+    }
+    public NodeId getNodeId() {
+     return node.getNodeID();
+    }
+    
+    public long getLastSeen() {
+      return  lastSeen;
+    }
+  }
   public NodeInfoTracker(NodeManager node, HeartbeatResponse lastHeartBeatResponse) {
     this.node = node;
     this.lastHeartBeatResponse = lastHeartBeatResponse;
-    this.lastSeen = System.currentTimeMillis();
+    this.heartBeatStatus = new NodeHeartbeatStatus(System.currentTimeMillis());
   }
 
   public synchronized NodeManager getNodeManager() {
     return this.node;
   }
 
-  public synchronized void updateLastSeen(long lastSeen) {
-    this.lastSeen = lastSeen;
-  }
 
-  public synchronized long getNodeLastSeen() {
-    return this.lastSeen;
+  public synchronized NodeHeartbeatStatus getlastHeartBeat() {
+    return this.heartBeatStatus;
   }
 
   public synchronized HeartbeatResponse getLastHeartBeatResponse() {
@@ -56,4 +68,8 @@ public class NodeInfoTracker {
   public synchronized void refreshHeartBeatResponse(HeartbeatResponse heartBeatResponse) {
     this.lastHeartBeatResponse = heartBeatResponse;
   }
+  
+  public synchronized void refreshLastHeartBeat() {
+   this.heartBeatStatus = new NodeHeartbeatStatus(System.currentTimeMillis()); 
+  }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java?rev=1126742&r1=1126741&r2=1126742&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeStatus.java Mon May 23 20:50:36 2011
@@ -38,7 +38,6 @@ public class NodeStatus {
     nodeStatus.setNodeHealthStatus(recordFactory
         .newRecordInstance(NodeHealthStatus.class));
     nodeStatus.getNodeHealthStatus().setIsNodeHealthy(true);
-    nodeStatus.setLastSeen(System.currentTimeMillis());
     return nodeStatus;
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1126742&r1=1126741&r2=1126742&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java Mon May 23 20:50:36 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NodeStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.ApplicationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfoTracker.NodeHeartbeatStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
@@ -93,21 +94,19 @@ ResourceTracker, ClusterTracker {
 
   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
-  private final TreeSet<NodeId> nmExpiryQueue =
-    new TreeSet<NodeId>(
-        new Comparator<NodeId>() {
-          public int compare(NodeId n1, NodeId n2) {
-            NodeInfoTracker nit1 = nodeManagers.get(n1);
-            NodeInfoTracker nit2 = nodeManagers.get(n2);
-            long p1LastSeen = nit1.getNodeLastSeen();
-            long p2LastSeen = nit2.getNodeLastSeen();
+  private final TreeSet<NodeHeartbeatStatus> nmExpiryQueue =
+    new TreeSet<NodeHeartbeatStatus>(
+        new Comparator<NodeHeartbeatStatus>() {
+          public int compare(NodeHeartbeatStatus n1,  NodeHeartbeatStatus n2) {
+            long p1LastSeen = n1.getLastSeen();
+            long p2LastSeen = n2.getLastSeen();
             if (p1LastSeen < p2LastSeen) {
               return -1;
             } else if (p1LastSeen > p2LastSeen) {
               return 1;
             } else {
-              return (nit1.getNodeManager().getNodeID().getId() -
-                  nit2.getNodeManager().getNodeID().getId());
+              return (n1.getNodeId().getId() -
+                  n2.getNodeId().getId());
             }
           }
         }
@@ -199,7 +198,6 @@ ResourceTracker, ClusterTracker {
         nodeManagers.put(nodeId, nTracker);
       } else {
         nTracker = nodeManagers.get(nodeId);
-        nTracker.updateLastSeen(System.currentTimeMillis());
       }
     }
     return nTracker;
@@ -226,7 +224,7 @@ ResourceTracker, ClusterTracker {
     } catch(IOException io) {
       throw  RPCUtil.getRemoteException(io);
     }
-    addForTracking(nodeId);
+    addForTracking(nTracker.getlastHeartBeat());
     LOG.info("NodeManager from node " + node + "(web-url: " + httpAddress
         + ") registered with capability: " + capability.getMemory()
         + ", assigned nodeId " + nodeId.getId());
@@ -271,7 +269,6 @@ ResourceTracker, ClusterTracker {
   @Override
   public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException {
     org.apache.hadoop.yarn.server.api.records.NodeStatus remoteNodeStatus = request.getNodeStatus();
-    remoteNodeStatus.setLastSeen(System.currentTimeMillis());
     NodeInfoTracker nTracker = null;
     NodeHeartbeatResponse nodeHbResponse = recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
     synchronized(nodeManagers) {
@@ -283,7 +280,8 @@ ResourceTracker, ClusterTracker {
       nodeHbResponse.setHeartbeatResponse(reboot);
       return nodeHbResponse;
     }
-
+    /* update the heart beat status */
+    nTracker.refreshLastHeartBeat();
     NodeManager nodeManager = nTracker.getNodeManager();
     /* check to see if its an old heartbeat */    
     if (remoteNodeStatus.getResponseId() + 1 == nTracker
@@ -310,15 +308,14 @@ ResourceTracker, ClusterTracker {
     updateListener(
         nodeManager, remoteNodeStatus.getAllContainers());
   
-
+    
     HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
     response.addAllContainersToCleanup(nodeResponse.getContainersToCleanUp());
 
     response.addAllApplicationsToCleanup(nodeResponse.getFinishedApplications());
     response.setResponseId(nTracker.getLastHeartBeatResponse().getResponseId() + 1);
-
+    
     nTracker.refreshHeartBeatResponse(response);
-    nTracker.updateLastSeen(remoteNodeStatus.getLastSeen());
     boolean prevHealthStatus =
       nTracker.getNodeManager().getNodeHealthStatus().getIsNodeHealthy();
     NodeHealthStatus remoteNodeHealthStatus =
@@ -401,9 +398,9 @@ ResourceTracker, ClusterTracker {
     return infoList;
   }
 
-  protected void addForTracking(NodeId nodeID) {
+  protected void addForTracking(NodeHeartbeatStatus nmStatus) {
     synchronized(nmExpiryQueue) {
-      nmExpiryQueue.add(nodeID);
+      nmExpiryQueue.add(nmStatus);
     }
   }
 
@@ -449,27 +446,30 @@ ResourceTracker, ClusterTracker {
         long now = System.currentTimeMillis();
         expired.clear();
         synchronized(nmExpiryQueue) {
-          NodeId leastRecent;
+          NodeHeartbeatStatus leastRecent;
           while ((nmExpiryQueue.size() > 0) &&
-              (leastRecent = nmExpiryQueue.first()) != null) {
+              (leastRecent = nmExpiryQueue.first()) != null && 
+              ((now - leastRecent.getLastSeen()) > 
+              nmExpiryInterval)) {
             nmExpiryQueue.remove(leastRecent);
             NodeInfoTracker info;
             synchronized(nodeManagers) {
-              info = nodeManagers.get(leastRecent);
+              info = nodeManagers.get(leastRecent.getNodeId());
             }
             if (info == null) {
               continue;
             }
-            NodeId nodeID = info.getNodeManager().getNodeID();
-            if ((now - info.getNodeLastSeen()) > nmExpiryInterval) {
-              LOG.info("Going to expire the node-manager " + nodeID
+            NodeId nodeId = leastRecent.getNodeId();
+            NodeHeartbeatStatus heartBeatStatus = info.getlastHeartBeat();
+            if ((now - heartBeatStatus.getLastSeen()) > nmExpiryInterval) {
+              LOG.info("Going to expire the node-manager " + info.getNodeManager().getNodeAddress()
                   + " because of no updates for "
-                  + (now - info.getNodeLastSeen())
+                  + (now - heartBeatStatus.getLastSeen())
                   + " seconds ( > expiry interval of " + nmExpiryInterval
                   + ").");
-              expired.add(nodeID);
+              expired.add(nodeId);
             } else {
-              nmExpiryQueue.add(nodeID);
+              nmExpiryQueue.add(heartBeatStatus);
               break;
             }
           }