You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/04/29 05:39:01 UTC

svn commit: r1097677 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/

Author: mahadev
Date: Fri Apr 29 03:39:00 2011
New Revision: 1097677

URL: http://svn.apache.org/viewvc?rev=1097677&view=rev
Log:
Completing the ZooKeeper Store for ResourceManager state. (mahadev)

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1097677&r1=1097676&r2=1097677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri Apr 29 03:39:00 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    Completing the ZooKeeper Store for ResourceManager state. (mahadev)
+
     Added support High-RAM applications in CapacityScheduler. (acmurthy) 
 
     Recovery of MR Application Master from failures. (sharad)

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java?rev=1097677&r1=1097676&r2=1097677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java Fri Apr 29 03:39:00 2011
@@ -1,7 +1,9 @@
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
@@ -90,19 +92,14 @@ public class MemStore implements Store {
     }
 
     @Override
-    public List<ApplicationSubmissionContext> getStoredSubmissionContexts()
-    throws IOException {
-      return new ArrayList<ApplicationSubmissionContext>();
-    }
-
-    @Override
     public NodeId getLastLoggedNodeId() throws IOException {
       return nodeId;
     }
 
     @Override
-    public List<ApplicationMaster> getStoredAMs() throws IOException {
-      return new ArrayList<ApplicationMaster>();
+    public Map<ApplicationId, ApplicationInfo> getStoredApplications()
+    throws IOException {
+      return new HashMap<ApplicationId, Store.ApplicationInfo>();
     }
   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java?rev=1097677&r1=1097676&r2=1097677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java Fri Apr 29 03:39:00 2011
@@ -19,18 +19,25 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
 
 
 public interface Store extends NodeStore, ApplicationStore {
+  public interface ApplicationInfo {
+    public ApplicationMaster getApplicationMaster();
+    public ApplicationSubmissionContext getApplicationSubmissionContext();
+    public List<Container> getContainers();
+  }
   public interface RMState {
     public List<NodeManager> getStoredNodeManagers() throws IOException;
-    public List<ApplicationSubmissionContext> getStoredSubmissionContexts() throws IOException;
-    public List<ApplicationMaster> getStoredAMs() throws IOException;
+    public Map<ApplicationId, ApplicationInfo> getStoredApplications() throws IOException;
     public NodeId getLastLoggedNodeId() throws IOException;
   }
   public RMState restore() throws IOException;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1097677&r1=1097676&r2=1097677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Fri Apr 29 03:39:00 2011
@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.re
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationMasterProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeManagerInfoProto;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
@@ -63,7 +68,6 @@ public class ZKStore implements Store {
   private static final String ZK_PATH_SEPARATOR = "/";
   private static final String NODE_ID = "nodeid";
   private static final String APP_MASTER = "master";
-  private static final String LAST_CONTAINER_ID = "last_containerid";
   private final String ZK_ADDRESS;
   private final int ZK_TIMEOUT;
   
@@ -216,6 +220,23 @@ public class ZKStore implements Store {
       throw convertToIOException(ke);
     }
   }
+  
+  @Override
+  public void updateApplicationState(ApplicationId applicationId,
+      ApplicationMaster master) throws IOException {
+    String appString = APPS + ConverterUtils.toString(applicationId);
+    ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
+    try {
+      zkClient.setData(appString, masterPBImpl.getProto().toByteArray(), -1);
+    } catch(InterruptedException ie) {
+      LOG.info("Interrupted", ie);
+      throw new InterruptedIOException(ie.getMessage());
+    } catch(KeeperException ke) {
+      LOG.info("Keeper exception", ke);
+      throw convertToIOException(ke);
+    }
+  }
+
 
   @Override
   public synchronized void removeApplication(ApplicationId application) throws IOException {
@@ -237,13 +258,43 @@ public class ZKStore implements Store {
     return rmState;
   }  
   
+  private class ApplicationInfoImpl implements ApplicationInfo {
+    private ApplicationMaster master;
+    private final ApplicationSubmissionContext context;
+    private final List<Container> containers = new ArrayList<Container>();
+    
+    public ApplicationInfoImpl(ApplicationSubmissionContext context) {
+      this.context = context;
+    }
+    
+    public void setApplicationMaster(ApplicationMaster master) {
+      this.master = master;
+    }
+    
+    @Override
+    public ApplicationMaster getApplicationMaster() {
+      return this.master;
+    }
+
+    @Override
+    public ApplicationSubmissionContext getApplicationSubmissionContext() {
+      return this.context;
+    }
+
+    @Override
+    public List<Container> getContainers() {
+      return this.containers;
+    }
+    
+    public void addContainer(Container container) {
+      containers.add(container);
+    }
+  }
+  
   private class ZKRMState implements RMState {
-    List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
-    List<ApplicationSubmissionContext> appSubmissionContexts = new 
-      ArrayList<ApplicationSubmissionContext>();
-    List<ApplicationMaster> masters = 
-      new ArrayList<ApplicationMaster>();
-    List<NodeManagerInfo> nodes = new ArrayList<NodeManagerInfo>();
+    private List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
+    private Map<ApplicationId, ApplicationInfo> applications = new 
+      HashMap<ApplicationId, ApplicationInfo>();
     
     public ZKRMState() {
       LOG.info("Restoring RM state from ZK");
@@ -252,6 +303,7 @@ public class ZKStore implements Store {
     private synchronized List<NodeManagerInfo> listStoredNodes() throws IOException {
       /** get the list of nodes stored in zk **/
       //TODO PB
+      List<NodeManagerInfo> nodes = new ArrayList<NodeManagerInfo>();
       Stat stat = new Stat();
       try {
         List<String> children = zkClient.getChildren(NODES, false);
@@ -277,13 +329,8 @@ public class ZKStore implements Store {
     }
 
     @Override
-    public List<ApplicationSubmissionContext> getStoredSubmissionContexts() {
-      return appSubmissionContexts;
-    }
-
-    @Override
     public NodeId getLastLoggedNodeId() {
-      return null;
+      return nodeId;
     }
     
     private void readLastNodeId() throws IOException {
@@ -300,6 +347,36 @@ public class ZKStore implements Store {
       }
     }
     
+    private ApplicationInfo getAppInfo(String app) throws IOException {
+      ApplicationInfoImpl info = null;
+      Stat stat = new Stat();
+      try {
+        ApplicationSubmissionContext context = null;
+        byte[] data = zkClient.getData(APPS + app, false, stat);
+        context = new ApplicationSubmissionContextPBImpl(
+            ApplicationSubmissionContextProto.parseFrom(data));
+        info = new ApplicationInfoImpl(context);
+        List<String> children = zkClient.getChildren(APPS + app, false, stat);
+        ApplicationMaster master = null;
+        for (String child: children) {
+          byte[] childdata = zkClient.getData(APPS + app + ZK_PATH_SEPARATOR + child, false, stat);
+          if (APP_MASTER.equals(child)) {
+            master = new ApplicationMasterPBImpl(ApplicationMasterProto.parseFrom(childdata));
+            info.setApplicationMaster(master);
+          } else {
+            Container container = new ContainerPBImpl(ContainerProto.parseFrom(data));
+            info.addContainer(container);
+          }
+        }
+      } catch(InterruptedException ie) {
+        LOG.info("Interrupted", ie);
+        throw new InterruptedIOException(ie.getMessage());
+      } catch(KeeperException ke) {
+        throw convertToIOException(ke);
+      }
+      return info;
+    }
+    
     private void load() throws IOException {
       List<NodeManagerInfo> nodeInfos = listStoredNodes();
       for (NodeManagerInfo node: nodeInfos) {
@@ -309,18 +386,26 @@ public class ZKStore implements Store {
         nodeManagers.add(nm);
       }
       readLastNodeId();
-      /* make sure we get all the containers */
-      
+      /* make sure we get all the applications */
+      List<String> apps = null;
+      try {
+        apps = zkClient.getChildren(APPS, false);
+      } catch(InterruptedException ie) {
+        LOG.info("Interrupted", ie);
+        throw new InterruptedIOException(ie.getMessage());
+      } catch(KeeperException ke) {
+        throw convertToIOException(ke);
+      }
+      for (String app: apps) {
+        ApplicationInfo info = getAppInfo(app);
+        applications.put(info.getApplicationMaster().getApplicationId(), info);
+      }
     }
+
     @Override
-    public List<ApplicationMaster> getStoredAMs() throws IOException {
-      return masters;
+    public Map<ApplicationId, ApplicationInfo> getStoredApplications()
+        throws IOException {
+      return applications;
     }
   }
-
-  @Override
-  public void updateApplicationState(ApplicationId applicationId,
-      ApplicationMaster master) throws IOException {
-    
-  }
 }
\ No newline at end of file