You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2011/04/29 05:39:01 UTC
svn commit: r1097677 - in /hadoop/mapreduce/branches/MR-279: ./
yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
Author: mahadev
Date: Fri Apr 29 03:39:00 2011
New Revision: 1097677
URL: http://svn.apache.org/viewvc?rev=1097677&view=rev
Log:
Completing the ZooKeeper Store for ResourceManager state. (mahadev)
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1097677&r1=1097676&r2=1097677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri Apr 29 03:39:00 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ Completing the ZooKeeper Store for ResourceManager state. (mahadev)
+
Added support High-RAM applications in CapacityScheduler. (acmurthy)
Recovery of MR Application Master from failures. (sharad)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java?rev=1097677&r1=1097676&r2=1097677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemStore.java Fri Apr 29 03:39:00 2011
@@ -1,7 +1,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
@@ -90,19 +92,14 @@ public class MemStore implements Store {
}
@Override
- public List<ApplicationSubmissionContext> getStoredSubmissionContexts()
- throws IOException {
- return new ArrayList<ApplicationSubmissionContext>();
- }
-
- @Override
public NodeId getLastLoggedNodeId() throws IOException {
return nodeId;
}
@Override
- public List<ApplicationMaster> getStoredAMs() throws IOException {
- return new ArrayList<ApplicationMaster>();
+ public Map<ApplicationId, ApplicationInfo> getStoredApplications()
+ throws IOException {
+ return new HashMap<ApplicationId, Store.ApplicationInfo>();
}
}
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java?rev=1097677&r1=1097676&r2=1097677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/Store.java Fri Apr 29 03:39:00 2011
@@ -19,18 +19,25 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationMaster;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
public interface Store extends NodeStore, ApplicationStore {
+ public interface ApplicationInfo {
+ public ApplicationMaster getApplicationMaster();
+ public ApplicationSubmissionContext getApplicationSubmissionContext();
+ public List<Container> getContainers();
+ }
public interface RMState {
public List<NodeManager> getStoredNodeManagers() throws IOException;
- public List<ApplicationSubmissionContext> getStoredSubmissionContexts() throws IOException;
- public List<ApplicationMaster> getStoredAMs() throws IOException;
+ public Map<ApplicationId, ApplicationInfo> getStoredApplications() throws IOException;
public NodeId getLastLoggedNodeId() throws IOException;
}
public RMState restore() throws IOException;
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java?rev=1097677&r1=1097676&r2=1097677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKStore.java Fri Apr 29 03:39:00 2011
@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,6 +42,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationMasterProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationSubmissionContextProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeManagerInfoProto;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
@@ -63,7 +68,6 @@ public class ZKStore implements Store {
private static final String ZK_PATH_SEPARATOR = "/";
private static final String NODE_ID = "nodeid";
private static final String APP_MASTER = "master";
- private static final String LAST_CONTAINER_ID = "last_containerid";
private final String ZK_ADDRESS;
private final int ZK_TIMEOUT;
@@ -216,6 +220,23 @@ public class ZKStore implements Store {
throw convertToIOException(ke);
}
}
+
+ @Override
+ public void updateApplicationState(ApplicationId applicationId,
+ ApplicationMaster master) throws IOException {
+ String appString = APPS + ConverterUtils.toString(applicationId);
+ ApplicationMasterPBImpl masterPBImpl = (ApplicationMasterPBImpl) master;
+ try {
+ zkClient.setData(appString, masterPBImpl.getProto().toByteArray(), -1);
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ LOG.info("Keeper exception", ke);
+ throw convertToIOException(ke);
+ }
+ }
+
@Override
public synchronized void removeApplication(ApplicationId application) throws IOException {
@@ -237,13 +258,43 @@ public class ZKStore implements Store {
return rmState;
}
+ private class ApplicationInfoImpl implements ApplicationInfo {
+ private ApplicationMaster master;
+ private final ApplicationSubmissionContext context;
+ private final List<Container> containers = new ArrayList<Container>();
+
+ public ApplicationInfoImpl(ApplicationSubmissionContext context) {
+ this.context = context;
+ }
+
+ public void setApplicationMaster(ApplicationMaster master) {
+ this.master = master;
+ }
+
+ @Override
+ public ApplicationMaster getApplicationMaster() {
+ return this.master;
+ }
+
+ @Override
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ return this.context;
+ }
+
+ @Override
+ public List<Container> getContainers() {
+ return this.containers;
+ }
+
+ public void addContainer(Container container) {
+ containers.add(container);
+ }
+ }
+
private class ZKRMState implements RMState {
- List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
- List<ApplicationSubmissionContext> appSubmissionContexts = new
- ArrayList<ApplicationSubmissionContext>();
- List<ApplicationMaster> masters =
- new ArrayList<ApplicationMaster>();
- List<NodeManagerInfo> nodes = new ArrayList<NodeManagerInfo>();
+ private List<NodeManager> nodeManagers = new ArrayList<NodeManager>();
+ private Map<ApplicationId, ApplicationInfo> applications = new
+ HashMap<ApplicationId, ApplicationInfo>();
public ZKRMState() {
LOG.info("Restoring RM state from ZK");
@@ -252,6 +303,7 @@ public class ZKStore implements Store {
private synchronized List<NodeManagerInfo> listStoredNodes() throws IOException {
/** get the list of nodes stored in zk **/
//TODO PB
+ List<NodeManagerInfo> nodes = new ArrayList<NodeManagerInfo>();
Stat stat = new Stat();
try {
List<String> children = zkClient.getChildren(NODES, false);
@@ -277,13 +329,8 @@ public class ZKStore implements Store {
}
@Override
- public List<ApplicationSubmissionContext> getStoredSubmissionContexts() {
- return appSubmissionContexts;
- }
-
- @Override
public NodeId getLastLoggedNodeId() {
- return null;
+ return nodeId;
}
private void readLastNodeId() throws IOException {
@@ -300,6 +347,36 @@ public class ZKStore implements Store {
}
}
+ private ApplicationInfo getAppInfo(String app) throws IOException {
+ ApplicationInfoImpl info = null;
+ Stat stat = new Stat();
+ try {
+ ApplicationSubmissionContext context = null;
+ byte[] data = zkClient.getData(APPS + app, false, stat);
+ context = new ApplicationSubmissionContextPBImpl(
+ ApplicationSubmissionContextProto.parseFrom(data));
+ info = new ApplicationInfoImpl(context);
+ List<String> children = zkClient.getChildren(APPS + app, false, stat);
+ ApplicationMaster master = null;
+ for (String child: children) {
+ byte[] childdata = zkClient.getData(APPS + app + ZK_PATH_SEPARATOR + child, false, stat);
+ if (APP_MASTER.equals(child)) {
+ master = new ApplicationMasterPBImpl(ApplicationMasterProto.parseFrom(childdata));
+ info.setApplicationMaster(master);
+ } else {
+ Container container = new ContainerPBImpl(ContainerProto.parseFrom(data));
+ info.addContainer(container);
+ }
+ }
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ throw convertToIOException(ke);
+ }
+ return info;
+ }
+
private void load() throws IOException {
List<NodeManagerInfo> nodeInfos = listStoredNodes();
for (NodeManagerInfo node: nodeInfos) {
@@ -309,18 +386,26 @@ public class ZKStore implements Store {
nodeManagers.add(nm);
}
readLastNodeId();
- /* make sure we get all the containers */
-
+ /* make sure we get all the applications */
+ List<String> apps = null;
+ try {
+ apps = zkClient.getChildren(APPS, false);
+ } catch(InterruptedException ie) {
+ LOG.info("Interrupted", ie);
+ throw new InterruptedIOException(ie.getMessage());
+ } catch(KeeperException ke) {
+ throw convertToIOException(ke);
+ }
+ for (String app: apps) {
+ ApplicationInfo info = getAppInfo(app);
+ applications.put(info.getApplicationMaster().getApplicationId(), info);
+ }
}
+
@Override
- public List<ApplicationMaster> getStoredAMs() throws IOException {
- return masters;
+ public Map<ApplicationId, ApplicationInfo> getStoredApplications()
+ throws IOException {
+ return applications;
}
}
-
- @Override
- public void updateApplicationState(ApplicationId applicationId,
- ApplicationMaster master) throws IOException {
-
- }
}
\ No newline at end of file