You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2011/04/29 10:35:56 UTC
svn commit: r1097727 [2/5] - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/
yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/
yarn/yarn-common/src/main/ja...
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerHeartbeatResponsePBImpl.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,195 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerActionProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
+
+public class LocalizerHeartbeatResponsePBImpl
+ extends ProtoBase<LocalizerHeartbeatResponseProto>
+ implements LocalizerHeartbeatResponse {
+
+ LocalizerHeartbeatResponseProto proto =
+ LocalizerHeartbeatResponseProto.getDefaultInstance();
+ LocalizerHeartbeatResponseProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List<LocalResource> resources;
+
+ public LocalizerHeartbeatResponsePBImpl() {
+ builder = LocalizerHeartbeatResponseProto.newBuilder();
+ }
+
+ public LocalizerHeartbeatResponsePBImpl(LocalizerHeartbeatResponseProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LocalizerHeartbeatResponseProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (resources != null) {
+ addResourcesToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LocalizerHeartbeatResponseProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ public LocalizerAction getLocalizerAction() {
+ LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasAction()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getAction());
+ }
+
+ public List<LocalResource> getAllResources() {
+ initResources();
+ return this.resources;
+ }
+
+ public LocalResource getLocalResource(int i) {
+ initResources();
+ return this.resources.get(i);
+ }
+
+ public void setLocalizerAction(LocalizerAction action) {
+ maybeInitBuilder();
+ if (action == null) {
+ builder.clearAction();
+ return;
+ }
+ builder.setAction(convertToProtoFormat(action));
+ }
+
+ private void initResources() {
+ if (this.resources != null) {
+ return;
+ }
+ LocalizerHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<LocalResourceProto> list = p.getResourcesList();
+ this.resources = new ArrayList<LocalResource>();
+
+ for (LocalResourceProto c : list) {
+ this.resources.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addResourcesToProto() {
+ maybeInitBuilder();
+ builder.clearResources();
+ if (this.resources == null)
+ return;
+ Iterable<LocalResourceProto> iterable =
+ new Iterable<LocalResourceProto>() {
+ @Override
+ public Iterator<LocalResourceProto> iterator() {
+ return new Iterator<LocalResourceProto>() {
+
+ Iterator<LocalResource> iter = resources.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public LocalResourceProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllResources(iterable);
+ }
+
+ public void addAllResources(List<LocalResource> resources) {
+ if (resources == null)
+ return;
+ initResources();
+ this.resources.addAll(resources);
+ }
+
+ public void addResource(LocalResource resource) {
+ initResources();
+ this.resources.add(resource);
+ }
+
+ public void removeResource(int index) {
+ initResources();
+ this.resources.remove(index);
+ }
+
+ public void clearResources() {
+ initResources();
+ this.resources.clear();
+ }
+
+ private LocalResource convertFromProtoFormat(LocalResourceProto p) {
+ return new LocalResourcePBImpl(p);
+ }
+
+ private LocalResourceProto convertToProtoFormat(LocalResource s) {
+ return ((LocalResourcePBImpl)s).getProto();
+ }
+
+ private LocalizerActionProto convertToProtoFormat(LocalizerAction a) {
+ return LocalizerActionProto.valueOf(a.name());
+ }
+
+ private LocalizerAction convertFromProtoFormat(LocalizerActionProto a) {
+ return LocalizerAction.valueOf(a.name());
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerStatusPBImpl.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerStatusPBImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/LocalizerStatusPBImpl.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,192 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalResourceStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerServiceProtos.LocalizerStatusProtoOrBuilder;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
+import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
+
+public class LocalizerStatusPBImpl
+ extends ProtoBase<LocalizerStatusProto> implements LocalizerStatus {
+
+ LocalizerStatusProto proto =
+ LocalizerStatusProto.getDefaultInstance();
+ LocalizerStatusProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private List<LocalResourceStatus> resources = null;
+
+ public LocalizerStatusPBImpl() {
+ builder = LocalizerStatusProto.newBuilder();
+ }
+
+ public LocalizerStatusPBImpl(LocalizerStatusProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public LocalizerStatusProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.resources != null) {
+ addResourcesToProto();
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = LocalizerStatusProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public String getLocalizerId() {
+ LocalizerStatusProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasLocalizerId()) {
+ return null;
+ }
+ return (p.getLocalizerId());
+ }
+
+ @Override
+ public List<LocalResourceStatus> getResources() {
+ initResources();
+ return this.resources;
+ }
+
+ @Override
+ public void setLocalizerId(String localizerId) {
+ maybeInitBuilder();
+ if (localizerId == null) {
+ builder.clearLocalizerId();
+ return;
+ }
+ builder.setLocalizerId(localizerId);
+ }
+
+ private void initResources() {
+ if (this.resources != null) {
+ return;
+ }
+ LocalizerStatusProtoOrBuilder p = viaProto ? proto : builder;
+ List<LocalResourceStatusProto> list = p.getResourcesList();
+ this.resources = new ArrayList<LocalResourceStatus>();
+
+ for (LocalResourceStatusProto c : list) {
+ this.resources.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addResourcesToProto() {
+ maybeInitBuilder();
+ builder.clearResources();
+ if (this.resources == null)
+ return;
+ Iterable<LocalResourceStatusProto> iterable =
+ new Iterable<LocalResourceStatusProto>() {
+ @Override
+ public Iterator<LocalResourceStatusProto> iterator() {
+ return new Iterator<LocalResourceStatusProto>() {
+
+ Iterator<LocalResourceStatus> iter = resources.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public LocalResourceStatusProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllResources(iterable);
+ }
+
+ @Override
+ public void addAllResources(List<LocalResourceStatus> resources) {
+ if (resources == null)
+ return;
+ initResources();
+ this.resources.addAll(resources);
+ }
+
+ @Override
+ public LocalResourceStatus getResourceStatus(int index) {
+ initResources();
+ return this.resources.get(index);
+ }
+
+ @Override
+ public void addResourceStatus(LocalResourceStatus resource) {
+ initResources();
+ this.resources.add(resource);
+ }
+
+ @Override
+ public void removeResource(int index) {
+ initResources();
+ this.resources.remove(index);
+ }
+
+ @Override
+ public void clearResources() {
+ initResources();
+ this.resources.clear();
+ }
+
+ private LocalResourceStatus
+ convertFromProtoFormat(LocalResourceStatusProto p) {
+ return new LocalResourceStatusPBImpl(p);
+ }
+
+ private LocalResourceStatusProto convertToProtoFormat(LocalResourceStatus s) {
+ return ((LocalResourceStatusPBImpl)s).getProto();
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerLocalization.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerLocalization.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerLocalization.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,21 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+public interface ContainerLocalization {
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerLocalizationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerLocalizationImpl.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerLocalizationImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerLocalizationImpl.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,29 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+
+public class ContainerLocalizationImpl implements ContainerLocalization {
+
+ public ContainerLocalizationImpl(Dispatcher dispatcher, Application app,
+ LocalizationProtocol localization) {
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Fri Apr 29 08:35:53 2011
@@ -74,7 +74,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -141,7 +141,7 @@ public class ContainerManagerImpl extend
new ContainerEventDispatcher());
dispatcher.register(ApplicationEventType.class,
new ApplicationEventDispatcher());
- dispatcher.register(LocalizerEventType.class, rsrcLocalizationSrvc);
+ dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
dispatcher.register(AuxServicesEventType.class, auxiluaryServices);
dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
@@ -218,22 +218,20 @@ public class ContainerManagerImpl extend
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
Container container = new ContainerImpl(this.dispatcher, launchContext);
+ //ContainerID containerID = launchContext.id;
ContainerId containerID = launchContext.getContainerId();
+ //ApplicationID applicationID = containerID.appID;
ApplicationId applicationID = containerID.getAppId();
- if (this.context.getContainers().putIfAbsent(containerID, container) != null) {
+ if (context.getContainers().putIfAbsent(containerID, container) != null) {
throw RPCUtil.getRemoteException("Container " + containerID
+ " already is running on this node!!");
}
-// if (LOG.isDebugEnabled()) { TODO
- LOG.info("CONTAINER: " + launchContext);
-// }
// Create the application
- Application application = new ApplicationImpl(this.dispatcher,
- launchContext.getUser(), launchContext.getContainerId().getAppId(),
- launchContext.getAllEnv(), launchContext.getAllLocalResources(),
- launchContext.getContainerTokens());
- if (this.context.getApplications().putIfAbsent(applicationID, application) == null) {
+ Application application = new ApplicationImpl(dispatcher,
+ launchContext.getUser(), applicationID);
+ if (null ==
+ context.getApplications().putIfAbsent(applicationID, application)) {
LOG.info("Creating a new application reference for app "
+ applicationID);
}
@@ -332,8 +330,8 @@ public class ContainerManagerImpl extend
case FINISH_CONTAINERS:
CMgrCompletedContainersEvent containersFinishedEvent =
(CMgrCompletedContainersEvent) event;
- for (org.apache.hadoop.yarn.api.records.Container container : containersFinishedEvent
- .getContainersToCleanup()) {
+ for (org.apache.hadoop.yarn.api.records.Container container :
+ containersFinishedEvent.getContainersToCleanup()) {
this.dispatcher.getEventHandler().handle(
new ContainerDiagnosticsUpdateEvent(container.getId(),
"Container Killed by ResourceManager"));
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java Fri Apr 29 08:35:53 2011
@@ -18,27 +18,15 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
-import java.io.IOException;
import java.util.Map;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-
public interface Application extends EventHandler<ApplicationEvent> {
- Map<String,String> getEnvironment();
-
- Map<String,LocalResource> getResources(LocalResourceVisibility vis);
-
- Map<Path,String> getLocalizedResources();
-
String getUser();
Map<ContainerId, Container> getContainers();
@@ -47,5 +35,4 @@ public interface Application extends Eve
ApplicationState getApplicationState();
- Credentials getCredentials() throws IOException;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Fri Apr 29 08:35:53 2011
@@ -18,33 +18,23 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizerEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
@@ -57,44 +47,18 @@ public class ApplicationImpl implements
final Dispatcher dispatcher;
final String user;
final ApplicationId appId;
- final Map<String,String> env;
- final Map<String,LocalResource> resources;
- final ByteBuffer containerTokens;
- Map<Path, String> localizedResources;
+ Path logDir;
private static final Log LOG = LogFactory.getLog(Application.class);
Map<ContainerId, Container> containers =
new HashMap<ContainerId, Container>();
- // TODO check for suitability of symlink name
- static Map<String, LocalResource>
- filterResources(Map<String, LocalResource> resources,
- LocalResourceVisibility state) {
- Map<String, LocalResource> ret =
- new HashMap<String, LocalResource>();
- for (Map.Entry<String, LocalResource> rsrc : resources.entrySet()) {
- if (state.equals(rsrc.getValue().getVisibility())) {
- ret.put(rsrc.getKey(), rsrc.getValue());
- }
- }
- return ret;
- }
-
- public ApplicationImpl(Dispatcher dispatcher,
- String user,
- ApplicationId appId,
- Map<String,String> env,
- Map<String,LocalResource> resources,
- ByteBuffer containerTokens) {
+ public ApplicationImpl(Dispatcher dispatcher, String user,
+ ApplicationId appId) {
this.dispatcher = dispatcher;
this.user = user.toString();
this.appId = appId;
- this.env = env;
- this.resources = null == resources
- ? new HashMap<String,LocalResource>()
- : resources;
- this.containerTokens = containerTokens;
stateMachine = stateMachineFactory.make(this);
}
@@ -111,6 +75,7 @@ public class ApplicationImpl implements
@Override
public synchronized ApplicationState getApplicationState() {
// TODO: Synchro should be at statemachine level.
+ // This is only for tests?
return this.stateMachine.getCurrentState();
}
@@ -119,56 +84,13 @@ public class ApplicationImpl implements
return this.containers;
}
- @Override
- public Map<String, String> getEnvironment() {
- return env;
- }
-
- @Override
- public Map<String, LocalResource>
- getResources(LocalResourceVisibility vis) {
- final Map<String, LocalResource> ret;
- if (LocalResourceVisibility.PUBLIC.equals(vis)) {
- ret = filterResources(resources, LocalResourceVisibility.PUBLIC);
- } else {
- // TODO separate these
- ret = filterResources(resources, LocalResourceVisibility.PRIVATE);
- ret.putAll(filterResources(resources,
- LocalResourceVisibility.APPLICATION));
- }
- return Collections.unmodifiableMap(ret);
- }
-
- @Override
- public Map<Path, String> getLocalizedResources() {
- if (ApplicationState.RUNNING.equals(stateMachine.getCurrentState())) {
- return localizedResources;
- }
- throw new IllegalStateException(
- "Invalid request for " + stateMachine.getCurrentState());
- }
-
- @Override
- public Credentials getCredentials() throws IOException {
- Credentials ret = new Credentials();
- if (containerTokens != null) {
- DataInputByteBuffer buf = new DataInputByteBuffer();
- buf.reset(containerTokens);
- ret.readTokenStorageStream(buf);
- for (Token<? extends TokenIdentifier> tk : ret.getAllTokens()) {
- LOG.info(" In Nodemanager , token " + tk);
- }
- }
- return ret;
- }
-
private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
new ContainerDoneTransition();
- private static StateMachineFactory<ApplicationImpl, ApplicationState, ApplicationEventType, ApplicationEvent> stateMachineFactory =
- new StateMachineFactory
- <ApplicationImpl, ApplicationState, ApplicationEventType, ApplicationEvent>
- (ApplicationState.NEW)
+ private static StateMachineFactory<ApplicationImpl, ApplicationState,
+ ApplicationEventType, ApplicationEvent> stateMachineFactory =
+ new StateMachineFactory<ApplicationImpl, ApplicationState,
+ ApplicationEventType, ApplicationEvent>(ApplicationState.NEW)
// Transitions from NEW state
.addTransition(ApplicationState.NEW, ApplicationState.INITING,
@@ -216,13 +138,14 @@ public class ApplicationImpl implements
ApplicationState.FINISHED,
ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)
- // TODO failure transitions are completely broken
-
// create the topology tables
.installTopology();
private final StateMachine<ApplicationState, ApplicationEventType, ApplicationEvent> stateMachine;
+ /**
+ * Notify services of new application.
+ */
static class AppInitTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
@@ -231,14 +154,14 @@ public class ApplicationImpl implements
Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container);
app.dispatcher.getEventHandler().handle(
- new ContainerEvent(container.getContainerID(),
- ContainerEventType.INIT_CONTAINER));
- app.dispatcher.getEventHandler().handle(
- new ApplicationLocalizerEvent(
- LocalizerEventType.INIT_APPLICATION_RESOURCES, app));
+ new ApplicationLocalizationEvent(
+ LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
}
}
+ /**
+ * Absorb initialization events while the application initializes.
+ */
static class AppIsInitingTransition implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
@@ -246,9 +169,8 @@ public class ApplicationImpl implements
ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
Container container = initEvent.getContainer();
app.containers.put(container.getContainerID(), container);
- app.dispatcher.getEventHandler().handle(
- new ContainerEvent(container.getContainerID(),
- ContainerEventType.INIT_CONTAINER));
+ LOG.info("Adding " + container.getContainerID()
+ + " to application " + app.toString());
}
}
@@ -256,18 +178,12 @@ public class ApplicationImpl implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
-
- ApplicationInitedEvent initedEvent = (ApplicationInitedEvent) event;
- app.localizedResources = initedEvent.getLocalizedResources();
// Start all the containers waiting for ApplicationInit
- Iterator<Container> it = app.containers.values().iterator();
- while (it.hasNext()) {
- Container container = it.next();
- if (container.getContainerState().equals(ContainerState.LOCALIZING)) {
- app.dispatcher.getEventHandler().handle(
- new ContainerEvent(container.getContainerID(),
- ContainerEventType.CONTAINER_RESOURCES_LOCALIZED));
- }
+ ApplicationInitedEvent initedEvent = (ApplicationInitedEvent) event;
+ app.logDir = initedEvent.getLogDir();
+ for (Container container : app.containers.values()) {
+ app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
+ container.getContainerID(), app.logDir));
}
}
}
@@ -277,12 +193,12 @@ public class ApplicationImpl implements
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
- ContainerId containerID = initEvent.getContainer().getContainerID();
- app.dispatcher.getEventHandler().handle(
- new ContainerEvent(containerID, ContainerEventType.INIT_CONTAINER));
- app.dispatcher.getEventHandler().handle(
- new ContainerEvent(containerID,
- ContainerEventType.CONTAINER_RESOURCES_LOCALIZED));
+ Container container = initEvent.getContainer();
+ app.containers.put(container.getContainerID(), container);
+ LOG.info("Adding " + container.getContainerID()
+ + " to application " + app.toString());
+ app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
+ container.getContainerID(), app.logDir));
}
}
@@ -292,17 +208,21 @@ public class ApplicationImpl implements
public void transition(ApplicationImpl app, ApplicationEvent event) {
ApplicationContainerFinishedEvent containerEvent =
(ApplicationContainerFinishedEvent) event;
- LOG.info("Removing " + containerEvent.getContainerID()
- + " from application " + app.toString());
- app.containers.remove(containerEvent.getContainerID());
+ if (null == app.containers.remove(containerEvent.getContainerID())) {
+ LOG.warn("Removing unknown " + containerEvent.getContainerID() +
+ " from application " + app.toString());
+ } else {
+ LOG.info("Removing " + containerEvent.getContainerID() +
+ " from application " + app.toString());
+ }
}
}
void handleAppFinishWithContainersCleanedup() {
// Delete Application level resources
this.dispatcher.getEventHandler().handle(
- new ApplicationLocalizerEvent(
- LocalizerEventType.DESTROY_APPLICATION_RESOURCES, this));
+ new ApplicationLocalizationEvent(
+ LocalizationEventType.DESTROY_APPLICATION_RESOURCES, this));
// TODO: Trigger the LogsManager
}
@@ -334,9 +254,8 @@ public class ApplicationImpl implements
}
}
- static class AppFinishTransition
- implements
- MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
+ static class AppFinishTransition implements
+ MultipleArcTransition<ApplicationImpl, ApplicationEvent, ApplicationState> {
@Override
public ApplicationState transition(ApplicationImpl app,
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java Fri Apr 29 08:35:53 2011
@@ -15,31 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
-import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
public class ApplicationInitedEvent extends ApplicationEvent {
- private final Path workdir;
- private final Map<Path,String> localizedResources;
+ private final Path logDir;
- public ApplicationInitedEvent(ApplicationId appID,
- Map<Path,String> localizedResources, Path workdir) {
+ public ApplicationInitedEvent(ApplicationId appID, Path logDir) {
super(appID, ApplicationEventType.APPLICATION_INITED);
- this.workdir = workdir;
- this.localizedResources = localizedResources;
- }
-
- public Map<Path,String> getLocalizedResources() {
- return localizedResources;
+ this.logDir = logDir;
}
- public Path getWorkDirectory() {
- return workdir;
+ public Path getLogDir() {
+ return logDir;
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java Fri Apr 29 08:35:53 2011
@@ -18,11 +18,14 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.event.EventHandler;
-
public interface Container extends EventHandler<ContainerEvent> {
org.apache.hadoop.yarn.api.records.ContainerId getContainerID();
@@ -33,6 +36,10 @@ public interface Container extends Event
ContainerLaunchContext getLaunchContext();
+ Credentials getCredentials();
+
+ Map<Path,String> getLocalizedResources();
+
org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer();
ContainerStatus cloneAndGetContainerStatus();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java Fri Apr 29 08:35:53 2011
@@ -28,7 +28,8 @@ public enum ContainerEventType {
// DownloadManager
CONTAINER_INITED,
- CONTAINER_RESOURCES_LOCALIZED,
+ RESOURCE_LOCALIZED,
+ RESOURCE_FAILED,
CONTAINER_RESOURCES_CLEANEDUP,
// Producer: ContainersLauncher
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Fri Apr 29 08:35:53 2011
@@ -18,14 +18,28 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+import java.io.IOException;
+
+import java.net.URISyntaxException;
+
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -34,11 +48,14 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -47,19 +64,24 @@ import org.apache.hadoop.yarn.util.Conve
public class ContainerImpl implements Container {
private final Dispatcher dispatcher;
+ private final Credentials credentials;
private final ContainerLaunchContext launchContext;
private int exitCode;
private final StringBuilder diagnostics;
private static final Log LOG = LogFactory.getLog(Container.class);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private final Map<LocalResourceRequest,String> pendingResources =
+ new HashMap<LocalResourceRequest,String>();
+ private final Map<Path,String> localizedResources =
+ new HashMap<Path,String>();
public ContainerImpl(Dispatcher dispatcher,
ContainerLaunchContext launchContext) {
this.dispatcher = dispatcher;
this.launchContext = launchContext;
this.diagnostics = new StringBuilder();
-
+ this.credentials = new Credentials();
stateMachine = stateMachineFactory.make(this);
}
@@ -75,9 +97,10 @@ public class ContainerImpl implements Co
stateMachineFactory =
new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
// From NEW State
- .addTransition(ContainerState.NEW, ContainerState.LOCALIZING,
- ContainerEventType.INIT_CONTAINER)
- .addTransition(ContainerState.NEW, ContainerState.NEW,
+ .addTransition(ContainerState.NEW,
+ EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
+ ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
+ .addTransition(ContainerState.NEW, ContainerState.NEW,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE,
@@ -85,14 +108,15 @@ public class ContainerImpl implements Co
// From LOCALIZING State
.addTransition(ContainerState.LOCALIZING,
- ContainerState.LOCALIZED,
- ContainerEventType.CONTAINER_RESOURCES_LOCALIZED,
- new LocalizedTransition())
- .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
+ EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
+ ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
+ .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
+ ContainerEventType.RESOURCE_FAILED,
+ new KillDuringLocalizationTransition()) // TODO update diagnostics
+ .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
- .addTransition(ContainerState.LOCALIZING,
- ContainerState.CONTAINER_RESOURCES_CLEANINGUP,
+ .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER,
new KillDuringLocalizationTransition())
@@ -102,9 +126,11 @@ public class ContainerImpl implements Co
.addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition())
- .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
- ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
- UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
// From RUNNING State
.addTransition(ContainerState.RUNNING,
@@ -115,23 +141,23 @@ public class ContainerImpl implements Co
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition())
- .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
- ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
- UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RUNNING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
// From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
- ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
- CONTAINER_DONE_TRANSITION)
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
+ CONTAINER_DONE_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
- ContainerState.EXITED_WITH_SUCCESS,
- ContainerEventType.KILL_CONTAINER)
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.KILL_CONTAINER)
// From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@@ -150,9 +176,9 @@ public class ContainerImpl implements Co
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new ContainerKilledTransition())
- .addTransition(ContainerState.KILLING, ContainerState.KILLING,
- ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
- UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.KILLING, ContainerState.KILLING,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER)
.addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
@@ -171,13 +197,16 @@ public class ContainerImpl implements Co
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerEventType.KILL_CONTAINER)
// From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, CONTAINER_DONE_TRANSITION)
- .addTransition(ContainerState.DONE, ContainerState.DONE,
- ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
- UPDATE_DIAGNOSTICS_TRANSITION)
+ .addTransition(ContainerState.DONE, ContainerState.DONE,
+ ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+ UPDATE_DIAGNOSTICS_TRANSITION)
// create the topology tables
.installTopology();
@@ -215,13 +244,27 @@ public class ContainerImpl implements Co
}
@Override
+ public Map<Path,String> getLocalizedResources() {
+ assert ContainerState.LOCALIZED == getContainerState();
+ return localizedResources;
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ @Override
public ContainerState getContainerState() {
return stateMachine.getCurrentState();
}
@Override
- public synchronized org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer() {
- org.apache.hadoop.yarn.api.records.Container c = recordFactory.newRecordInstance(org.apache.hadoop.yarn.api.records.Container.class);
+ public synchronized
+ org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer() {
+ org.apache.hadoop.yarn.api.records.Container c =
+ recordFactory.newRecordInstance(
+ org.apache.hadoop.yarn.api.records.Container.class);
c.setId(this.launchContext.getContainerId());
c.setResource(this.launchContext.getResource());
c.setState(getCurrentState());
@@ -253,15 +296,42 @@ public class ContainerImpl implements Co
}
- static class LocalizedTransition extends ContainerTransition {
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class RequestResourcesTransition implements
+ MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@Override
- public void transition(ContainerImpl container, ContainerEvent event) {
- // XXX This needs to be container-oriented
+ public ContainerState transition(ContainerImpl container,
+ ContainerEvent event) {
+ final ContainerLaunchContext ctxt = container.getLaunchContext();
+
+ // parse credentials
+ ByteBuffer creds = ctxt.getContainerTokens();
+ if (creds != null) {
+ try {
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ creds.rewind();
+ buf.reset(creds);
+ container.credentials.readTokenStorageStream(buf);
+ if (LOG.isDebugEnabled()) {
+ for (Token<? extends TokenIdentifier> tk :
+ container.credentials.getAllTokens()) {
+ LOG.debug(tk.getService() + " = " + tk.toString());
+ }
+ }
+ } catch (IOException e) {
+ // invalid credentials
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationEvent(
+ LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ return ContainerState.LOCALIZING;
+ }
+ }
+
// Inform the AuxServices about the opaque serviceData
- ContainerLaunchContext ctxt = container.getLaunchContext();
Map<String,ByteBuffer> csd = ctxt.getAllServiceData();
if (csd != null) {
// TODO: Isn't this supposed to happen only once per Application?
+ // ^ each container may have distinct service data
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
container.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
@@ -269,12 +339,92 @@ public class ContainerImpl implements Co
service.getKey().toString(), service.getValue()));
}
}
+
+ // Send requests for public, private resources
+ Map<String,LocalResource> cntrRsrc = ctxt.getAllLocalResources();
+ if (!cntrRsrc.isEmpty()) {
+ ArrayList<LocalResourceRequest> publicRsrc =
+ new ArrayList<LocalResourceRequest>();
+ ArrayList<LocalResourceRequest> privateRsrc =
+ new ArrayList<LocalResourceRequest>();
+ ArrayList<LocalResourceRequest> appRsrc =
+ new ArrayList<LocalResourceRequest>();
+ try {
+ for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
+ LocalResourceRequest req =
+ new LocalResourceRequest(rsrc.getValue());
+ container.pendingResources.put(req, rsrc.getKey());
+ switch (rsrc.getValue().getVisibility()) {
+ case PUBLIC:
+ publicRsrc.add(req);
+ break;
+ case PRIVATE:
+ privateRsrc.add(req);
+ break;
+ case APPLICATION:
+ appRsrc.add(req);
+ break;
+ }
+ }
+ } catch (URISyntaxException e) {
+ // malformed resource; abort container launch
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationEvent(
+ LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ return ContainerState.LOCALIZING;
+ }
+ if (!publicRsrc.isEmpty()) {
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationRequestEvent(
+ container, publicRsrc, LocalResourceVisibility.PUBLIC));
+ }
+ if (!privateRsrc.isEmpty()) {
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationRequestEvent(
+ container, privateRsrc, LocalResourceVisibility.PRIVATE));
+ }
+ if (!appRsrc.isEmpty()) {
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizationRequestEvent(
+ container, appRsrc, LocalResourceVisibility.APPLICATION));
+ }
+ return ContainerState.LOCALIZING;
+ } else {
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.LAUNCH_CONTAINER));
+ return ContainerState.LOCALIZED;
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ static class LocalizedTransition implements
+ MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
+ @Override
+ public ContainerState transition(ContainerImpl container,
+ ContainerEvent event) {
+ ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
+ String sym = container.pendingResources.remove(rsrcEvent.getResource());
+ if (null == sym) {
+ LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
+ " for container " + container.getContainerID());
+ assert false;
+ // fail container?
+ return ContainerState.LOCALIZING;
+ }
+ container.localizedResources.put(rsrcEvent.getLocation(), sym);
+ if (!container.pendingResources.isEmpty()) {
+ return ContainerState.LOCALIZING;
+ }
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.LAUNCH_CONTAINER));
+ return ContainerState.LOCALIZED;
}
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
static class LaunchTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@@ -289,6 +439,7 @@ public class ContainerImpl implements Co
}
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithSuccessTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@@ -297,11 +448,12 @@ public class ContainerImpl implements Co
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.dispatcher.getEventHandler().handle(
- new ContainerLocalizerEvent(
- LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ new ContainerLocalizationEvent(
+ LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
}
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
static class ExitedWithFailureTransition extends ContainerTransition {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
@@ -314,11 +466,12 @@ public class ContainerImpl implements Co
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.dispatcher.getEventHandler().handle(
- new ContainerLocalizerEvent(
- LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ new ContainerLocalizationEvent(
+ LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
}
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
static class KillDuringLocalizationTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -326,12 +479,13 @@ public class ContainerImpl implements Co
// Inform the localizer to decrement reference counts and cleanup
// resources.
container.dispatcher.getEventHandler().handle(
- new ContainerLocalizerEvent(
- LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ new ContainerLocalizationEvent(
+ LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
}
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
static class KillTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -343,6 +497,7 @@ public class ContainerImpl implements Co
}
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerKilledTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -353,11 +508,12 @@ public class ContainerImpl implements Co
// The process/process-grp is killed. Decrement reference counts and
// cleanup resources
container.dispatcher.getEventHandler().handle(
- new ContainerLocalizerEvent(
- LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ new ContainerLocalizationEvent(
+ LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
}
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
static class ContainerDoneTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
@@ -406,4 +562,5 @@ public class ContainerImpl implements Co
public String toString() {
return ConverterUtils.toString(launchContext.getContainerId());
}
+
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class ContainerInitEvent extends ContainerEvent {
+
+ private final Path logDir;
+
+ public ContainerInitEvent(ContainerId c, Path logDir) {
+ super(c, ContainerEventType.INIT_CONTAINER);
+ this.logDir = logDir;
+ }
+
+ public Path getLogDir() {
+ return logDir;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceEvent.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceEvent.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,37 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+public class ContainerResourceEvent extends ContainerEvent {
+
+ private final LocalResourceRequest rsrc;
+
+ public ContainerResourceEvent(ContainerId container,
+ ContainerEventType type, LocalResourceRequest rsrc) {
+ super(container, type);
+ this.rsrc = rsrc;
+ }
+
+ public LocalResourceRequest getResource() {
+ return rsrc;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceFailedEvent.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceFailedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceFailedEvent.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,19 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+public class ContainerResourceFailedEvent extends ContainerResourceEvent {
+
+ private final Exception exception;
+
+ public ContainerResourceFailedEvent(ContainerId container,
+ LocalResourceRequest rsrc, Exception cause) {
+ super(container, ContainerEventType.RESOURCE_FAILED, rsrc);
+ this.exception = cause;
+ }
+
+ public Exception getCause() {
+ return exception;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceLocalizedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceLocalizedEvent.java?rev=1097727&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceLocalizedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResourceLocalizedEvent.java Fri Apr 29 08:35:53 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+
+public class ContainerResourceLocalizedEvent extends ContainerResourceEvent {
+
+ private final Path loc;
+
+ public ContainerResourceLocalizedEvent(ContainerId container, LocalResourceRequest rsrc,
+ Path loc) {
+ super(container, ContainerEventType.RESOURCE_LOCALIZED, rsrc);
+ this.loc = loc;
+ }
+
+ public Path getLocation() {
+ return loc;
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Fri Apr 29 08:35:53 2011
@@ -31,11 +31,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Shell.ExitCodeException;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -46,7 +43,8 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> {
@@ -72,40 +70,34 @@ public class ContainerLaunch implements
}
@Override
+ @SuppressWarnings("unchecked") // dispatcher not typed
public Integer call() {
final ContainerLaunchContext launchContext = container.getLaunchContext();
- final Map<Path,String> localizedResources = app.getLocalizedResources();
+ final Map<Path,String> localResources = container.getLocalizedResources();
final String user = launchContext.getUser();
final Map<String,String> env = launchContext.getAllEnv();
final List<String> command = launchContext.getCommandList();
int ret = -1;
try {
FileContext lfs = FileContext.getLocalFSFileContext();
- Path launchSysDir = new Path(sysDir, container.toString());
- lfs.mkdir(launchSysDir, null, false);
+ Path launchSysDir =
+ new Path(sysDir, ConverterUtils.toString(container.getContainerID()));
+ lfs.mkdir(launchSysDir, null, true);
Path launchPath = new Path(launchSysDir, CONTAINER_SCRIPT);
Path tokensPath =
- new Path(launchSysDir, ApplicationLocalizer.APPTOKEN_FILE);
+ new Path(launchSysDir, String.format(
+ ContainerLocalizer.TOKEN_FILE_FMT,
+ ConverterUtils.toString(container.getContainerID())));
DataOutputStream launchOut = null;
DataOutputStream tokensOut = null;
try {
launchOut = lfs.create(launchPath, EnumSet.of(CREATE, OVERWRITE));
- ApplicationLocalizer.writeLaunchEnv(launchOut, env, localizedResources,
+ ContainerLocalizer.writeLaunchEnv(launchOut, env, localResources,
command, appDirs);
tokensOut = lfs.create(tokensPath, EnumSet.of(CREATE, OVERWRITE));
- Credentials creds = new Credentials();
- if (container.getLaunchContext().getContainerTokens() != null) {
- // TODO: Is the conditional the correct way of checking?
- DataInputByteBuffer buf = new DataInputByteBuffer();
- container.getLaunchContext().getContainerTokens().rewind();
- buf.reset(container.getLaunchContext().getContainerTokens());
- creds.readTokenStorageStream(buf);
- for (Token<? extends TokenIdentifier> tk : creds.getAllTokens()) {
- LOG.debug(tk.getService() + " = " + tk.toString());
- }
- }
+ Credentials creds = container.getCredentials();
creds.writeTokenStorageToStream(tokensOut);
} finally {
IOUtils.cleanup(LOG, launchOut, tokensOut);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1097727&r1=1097726&r2=1097727&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Fri Apr 29 08:35:53 2011
@@ -45,9 +45,10 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* The launcher for the containers. This service should be started only after
@@ -137,12 +138,13 @@ public class ContainersLauncher extends
context.getApplications().get(containerId.getAppId());
List<Path> appDirs = new ArrayList<Path>(localDirs.size());
for (Path p : localDirs) {
- Path usersdir = new Path(p, ApplicationLocalizer.USERCACHE);
+ Path usersdir = new Path(p, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usersdir, userName);
- Path appsdir = new Path(userdir, ApplicationLocalizer.APPCACHE);
+ Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
appDirs.add(new Path(appsdir, app.toString()));
}
- Path appSysDir = new Path(sysDirs.get(0), app.toString());
+ Path appSysDir =
+ new Path(sysDirs.get(0), ConverterUtils.toString(app.getAppId()));
// TODO set in Application
//Path appLogDir = new Path(logDirs.get(0), app.toString());
ContainerLaunch launch =