You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/19 19:04:11 UTC
svn commit: r1124996 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/
yarn/ya...
Author: vinodkv
Date: Thu May 19 17:04:11 2011
New Revision: 1124996
URL: http://svn.apache.org/viewvc?rev=1124996&view=rev
Log:
Distributed cache bug fix to pass Terasort. Contributed by Vinod Kumar Vavilapalli.
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Thu May 19 17:04:11 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ Distributed cache bug fix to pass Terasort. (vinodkv)
+
Add junit jar to lib in assembly (mahadev and luke)
MAPREDUCE-2509. Fix NPE in UI for pending attempts. (luke lu via mahadev)
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu May 19 17:04:11 2011
@@ -510,7 +510,7 @@ public abstract class TaskAttemptImpl im
// //////////// End of JobConf setup
// Setup DistributedCache
- setupDistributedCache(conf, container);
+ setupDistributedCache(remoteFS, conf, container);
// Setup up tokens
Credentials taskCredentials = new Credentials();
@@ -586,11 +586,11 @@ public abstract class TaskAttemptImpl im
return container;
}
- private void setupDistributedCache(Configuration conf,
+ private void setupDistributedCache(FileContext remoteFS, Configuration conf,
ContainerLaunchContext container) throws IOException {
// Cache archives
- parseDistributedCacheArtifacts(container, LocalResourceType.ARCHIVE,
+ parseDistributedCacheArtifacts(remoteFS, container, LocalResourceType.ARCHIVE,
DistributedCache.getCacheArchives(conf),
DistributedCache.getArchiveTimestamps(conf),
getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
@@ -598,7 +598,7 @@ public abstract class TaskAttemptImpl im
DistributedCache.getArchiveClassPaths(conf));
// Cache files
- parseDistributedCacheArtifacts(container, LocalResourceType.FILE,
+ parseDistributedCacheArtifacts(remoteFS, container, LocalResourceType.FILE,
DistributedCache.getCacheFiles(conf),
DistributedCache.getFileTimestamps(conf),
getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
@@ -609,8 +609,8 @@ public abstract class TaskAttemptImpl im
// TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
// long[], boolean[], Path[], FileType)
- private static void parseDistributedCacheArtifacts(
- ContainerLaunchContext container, LocalResourceType type,
+ private void parseDistributedCacheArtifacts(
+ FileContext remoteFS, ContainerLaunchContext container, LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] classpaths) throws IOException {
@@ -634,7 +634,11 @@ public abstract class TaskAttemptImpl im
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
- Path p = new Path(u.toString());
+ Path p = new Path(u);
+ if (!p.isAbsolute()) {
+ p = p.makeQualified(remoteFS.getDefaultFileSystem()
+ .getUri(), remoteFS.getWorkingDirectory());
+ }
// Add URI fragment or just the filename
Path name = new Path((null == u.getFragment())
? p.getName()
@@ -645,7 +649,7 @@ public abstract class TaskAttemptImpl im
container.setLocalResource(
name.toUri().getPath(),
BuilderUtils.newLocalResource(recordFactory,
- uris[i], type,
+ p.toUri(), type,
visibilities[i]
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.PRIVATE,
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu May 19 17:04:11 2011
@@ -82,7 +82,7 @@ public class ResourceMgrDelegate {
private ApplicationId applicationId;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
- public ResourceMgrDelegate(Configuration conf) throws UnsupportedFileSystemException {
+ public ResourceMgrDelegate(Configuration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress =
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu May 19 17:04:11 2011
@@ -95,19 +95,20 @@ public class YARNRunner implements Clien
private ResourceMgrDelegate resMgrDelegate;
private ClientServiceDelegate clientServiceDelegate;
private YarnConfiguration conf;
+ private final FileContext defaultFileContext;
/**
* Yarn runner incapsulates the client interface of
* yarn
* @param conf the configuration object for the client
*/
- public YARNRunner(Configuration conf)
- throws YarnRemoteException {
+ public YARNRunner(Configuration conf) {
this.conf = new YarnConfiguration(conf);
try {
this.resMgrDelegate = new ResourceMgrDelegate(conf);
this.clientServiceDelegate = new ClientServiceDelegate(conf,
resMgrDelegate);
+ this.defaultFileContext = FileContext.getFileContext(conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
}
@@ -269,26 +270,23 @@ public class YARNRunner implements Clien
LOG.info("AppMaster capability = " + capability);
appContext.setMasterCapability(capability);
- FileContext defaultFS = FileContext.getFileContext(conf);
Path jobConfPath =
new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE);
URL yarnUrlForJobSubmitDir =
- ConverterUtils.getYarnUrlFromPath(defaultFS.makeQualified(new Path(
+ ConverterUtils.getYarnUrlFromPath(defaultFileContext.makeQualified(new Path(
jobSubmitDir)));
-// appContext.resources = new HashMap<CharSequence, URL>();
LOG.debug("Creating setup context, jobSubmitDir url is "
+ yarnUrlForJobSubmitDir);
appContext.setResource(YARNApplicationConstants.JOB_SUBMIT_DIR,
yarnUrlForJobSubmitDir);
-// appContext.resources_todo = new HashMap<CharSequence,LocalResource>();
appContext.setResourceTodo(YARNApplicationConstants.JOB_CONF_FILE,
- createApplicationResource(defaultFS,
+ createApplicationResource(defaultFileContext,
jobConfPath));
appContext.setResourceTodo(YARNApplicationConstants.JOB_JAR,
- createApplicationResource(defaultFS,
+ createApplicationResource(defaultFileContext,
new Path(jobSubmitDir, YARNApplicationConstants.JOB_JAR)));
// TODO gross hack
@@ -296,7 +294,7 @@ public class YARNRunner implements Clien
YarnConfiguration.APPLICATION_TOKENS_FILE }) {
appContext.setResourceTodo(
YARNApplicationConstants.JOB_SUBMIT_DIR + "/" + s,
- createApplicationResource(defaultFS, new Path(jobSubmitDir, s)));
+ createApplicationResource(defaultFileContext, new Path(jobSubmitDir, s)));
}
// TODO: Only if security is on.
@@ -385,7 +383,7 @@ public class YARNRunner implements Clien
// TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
- private static void parseDistributedCacheArtifacts(
+ private void parseDistributedCacheArtifacts(
ApplicationSubmissionContext container, LocalResourceType type,
URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
Path[] classpaths) throws IOException {
@@ -410,7 +408,11 @@ public class YARNRunner implements Clien
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
- Path p = new Path(u.toString());
+ Path p = new Path(u);
+ if (!p.isAbsolute()) {
+ p = p.makeQualified(this.defaultFileContext.getDefaultFileSystem()
+ .getUri(), this.defaultFileContext.getWorkingDirectory());
+ }
// Add URI fragment or just the filename
Path name = new Path((null == u.getFragment())
? p.getName()
@@ -420,8 +422,8 @@ public class YARNRunner implements Clien
}
container.setResourceTodo(
name.toUri().getPath(),
- getLocalResource(
- uris[i], type,
+ createLocalResource(
+ p.toUri(), type,
visibilities[i]
? LocalResourceVisibility.PUBLIC
: LocalResourceVisibility.PRIVATE,
@@ -449,7 +451,7 @@ public class YARNRunner implements Clien
return result;
}
- private static LocalResource getLocalResource(URI uri,
+ private static LocalResource createLocalResource(URI uri,
LocalResourceType type, LocalResourceVisibility visibility,
long size, long timestamp) {
LocalResource resource = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(LocalResource.class);
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Thu May 19 17:04:11 2011
@@ -58,8 +58,7 @@ public class ConverterUtils {
String authority = url.getHost() != null ? url.getHost() + ":" + url.getPort()
: "";
return new Path(
- (new URI(scheme, authority, url.getFile(), null, null))
- .normalize());
+ (new URI(scheme, authority, url.getFile(), null, null)).normalize());
}
/**
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Thu May 19 17:04:11 2011
@@ -234,7 +234,11 @@ public class ContainerManagerImpl extend
@Override
public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
ContainerLaunchContext launchContext = request.getContainerLaunchContext();
-
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" container is " + request);
+ }
+
// parse credentials
ByteBuffer tokens = launchContext.getContainerTokens();
Credentials credentials = new Credentials();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Thu May 19 17:04:11 2011
@@ -24,6 +24,9 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -59,6 +62,8 @@ import org.apache.hadoop.yarn.util.Conve
public class ContainerImpl implements Container {
+ private final Lock readLock;
+ private final Lock writeLock;
private final Dispatcher dispatcher;
private final Credentials credentials;
private final ContainerLaunchContext launchContext;
@@ -79,6 +84,10 @@ public class ContainerImpl implements Co
this.diagnostics = new StringBuilder();
this.credentials = creds;
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
stateMachine = stateMachineFactory.make(this);
}
@@ -95,7 +104,8 @@ public class ContainerImpl implements Co
new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
// From NEW State
.addTransition(ContainerState.NEW,
- EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
+ EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
+ ContainerState.LOCALIZATION_FAILED),
ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
.addTransition(ContainerState.NEW, ContainerState.NEW,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@@ -107,7 +117,8 @@ public class ContainerImpl implements Co
.addTransition(ContainerState.LOCALIZING,
EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED),
ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
- .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
+ .addTransition(ContainerState.LOCALIZING,
+ ContainerState.LOCALIZATION_FAILED,
ContainerEventType.RESOURCE_FAILED,
new KillDuringLocalizationTransition()) // TODO update diagnostics
.addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
@@ -117,6 +128,12 @@ public class ContainerImpl implements Co
ContainerEventType.KILL_CONTAINER,
new KillDuringLocalizationTransition())
+ // From LOCALIZATION_FAILED State
+ .addTransition(ContainerState.LOCALIZATION_FAILED,
+ ContainerState.DONE,
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
+ CONTAINER_DONE_TRANSITION)
+
// From LOCALIZED State
.addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
@@ -211,10 +228,11 @@ public class ContainerImpl implements Co
private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
stateMachine;
- private synchronized org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
+ private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
switch (stateMachine.getCurrentState()) {
case NEW:
case LOCALIZING:
+ case LOCALIZATION_FAILED:
case LOCALIZED:
return org.apache.hadoop.yarn.api.records.ContainerState.INITIALIZING;
case RUNNING:
@@ -232,55 +250,96 @@ public class ContainerImpl implements Co
@Override
public ContainerId getContainerID() {
- return this.launchContext.getContainerId();
+ this.readLock.lock();
+ try {
+ return this.launchContext.getContainerId();
+ } finally {
+ this.readLock.unlock();
+ }
}
@Override
public String getUser() {
- return this.launchContext.getUser();
+ this.readLock.lock();
+ try {
+ return this.launchContext.getUser();
+ } finally {
+ this.readLock.unlock();
+ }
}
@Override
public Map<Path,String> getLocalizedResources() {
- assert ContainerState.LOCALIZED == getContainerState();
+ this.readLock.lock();
+ try {
+ assert ContainerState.LOCALIZED == getContainerState(); // TODO: FIXME!!
return localizedResources;
+ } finally {
+ this.readLock.unlock();
+ }
}
@Override
public Credentials getCredentials() {
- return credentials;
+ this.readLock.lock();
+ try {
+ return credentials;
+ } finally {
+ this.readLock.unlock();
+ }
}
@Override
public ContainerState getContainerState() {
- return stateMachine.getCurrentState();
+ this.readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
}
@Override
- public synchronized
+ public
org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer() {
- org.apache.hadoop.yarn.api.records.Container c =
- recordFactory.newRecordInstance(
- org.apache.hadoop.yarn.api.records.Container.class);
- c.setId(this.launchContext.getContainerId());
- c.setResource(this.launchContext.getResource());
- c.setState(getCurrentState());
- return c;
+ this.readLock.lock();
+ try {
+ org.apache.hadoop.yarn.api.records.Container c =
+ recordFactory.newRecordInstance(
+ org.apache.hadoop.yarn.api.records.Container.class);
+ c.setId(this.launchContext.getContainerId());
+ c.setResource(this.launchContext.getResource());
+ c.setState(getCurrentState());
+ return c;
+ } finally {
+ this.readLock.unlock();
+ }
}
@Override
public ContainerLaunchContext getLaunchContext() {
- return launchContext;
+ this.readLock.lock();
+ try {
+ return launchContext;
+ } finally {
+ this.readLock.unlock();
+ }
}
@Override
public ContainerStatus cloneAndGetContainerStatus() {
- ContainerStatus containerStatus = recordFactory.newRecordInstance(ContainerStatus.class);
- containerStatus.setState(getCurrentState());
- containerStatus.setContainerId(this.launchContext.getContainerId());
- containerStatus.setDiagnostics(diagnostics.toString());
- containerStatus.setExitStatus(String.valueOf(exitCode));
- return containerStatus;
+ this.readLock.lock();
+ try {
+ ContainerStatus containerStatus =
+ recordFactory.newRecordInstance(ContainerStatus.class);
+ containerStatus.setState(getCurrentState());
+ containerStatus.setContainerId(this.launchContext.getContainerId());
+ containerStatus.setDiagnostics(diagnostics.toString());
+ containerStatus.setExitStatus(String.valueOf(exitCode));
+ return containerStatus;
+ } finally {
+ this.readLock.unlock();
+ }
}
static class ContainerTransition implements
@@ -304,8 +363,8 @@ public class ContainerImpl implements Co
// Inform the AuxServices about the opaque serviceData
Map<String,ByteBuffer> csd = ctxt.getAllServiceData();
if (csd != null) {
- // TODO: Isn't this supposed to happen only once per Application?
- // ^ each container may have distinct service data
+ // This can happen more than once per Application as each container may
+ // have distinct service data
for (Map.Entry<String,ByteBuffer> service : csd.entrySet()) {
container.dispatcher.getEventHandler().handle(
new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
@@ -325,6 +384,7 @@ public class ContainerImpl implements Co
new ArrayList<LocalResourceRequest>();
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
+ try {
LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue());
container.pendingResources.put(req, rsrc.getKey());
@@ -339,13 +399,19 @@ public class ContainerImpl implements Co
appRsrc.add(req);
break;
}
+ } catch (URISyntaxException e) {
+ LOG.info("Got exception parsing " + rsrc.getKey()
+ + " and value " + rsrc.getValue());
+ throw e;
+ }
}
} catch (URISyntaxException e) {
// malformed resource; abort container launch
+ LOG.warn("Failed to parse resource-request", e);
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationEvent(
LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
- return ContainerState.LOCALIZING;
+ return ContainerState.LOCALIZATION_FAILED;
}
if (!publicRsrc.isEmpty()) {
container.dispatcher.getEventHandler().handle(
@@ -516,28 +582,39 @@ public class ContainerImpl implements Co
}
@Override
- public synchronized void handle(ContainerEvent event) {
+ public void handle(ContainerEvent event) {
+ try {
+ this.writeLock.lock();
- ContainerId containerID = event.getContainerID();
- LOG.info("Processing " + containerID + " of type " + event.getType());
+ ContainerId containerID = event.getContainerID();
+ LOG.info("Processing " + containerID + " of type " + event.getType());
- ContainerState oldState = stateMachine.getCurrentState();
- ContainerState newState = null;
- try {
- newState =
- stateMachine.doTransition(event.getType(), event);
- } catch (InvalidStateTransitonException e) {
- LOG.warn("Can't handle this event at current state", e);
- }
- if (oldState != newState) {
- LOG.info("Container " + containerID + " transitioned from " + oldState
- + " to " + newState);
+ ContainerState oldState = stateMachine.getCurrentState();
+ ContainerState newState = null;
+ try {
+ newState =
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.warn("Can't handle this event at current state", e);
+ }
+ if (oldState != newState) {
+ LOG.info("Container " + containerID + " transitioned from "
+ + oldState
+ + " to " + newState);
+ }
+ } finally {
+ this.writeLock.unlock();
}
}
@Override
public String toString() {
- return ConverterUtils.toString(launchContext.getContainerId());
+ this.readLock.lock();
+ try {
+ return ConverterUtils.toString(launchContext.getContainerId());
+ } finally {
+ this.readLock.unlock();
+ }
}
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java Thu May 19 17:04:11 2011
@@ -19,7 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState {
- NEW, LOCALIZING, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS,
+ NEW, LOCALIZING, LOCALIZATION_FAILED, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS,
EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL,
CONTAINER_RESOURCES_CLEANINGUP, DONE
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Thu May 19 17:04:11 2011
@@ -22,6 +22,9 @@ import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,6 +39,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
@@ -59,6 +63,9 @@ public class LocalizedResource implement
final Semaphore sem = new Semaphore(1);
final Queue<ContainerId> ref; // Queue of containers using this localized
// resource
+ private final Lock readLock;
+ private final Lock writeLock;
+
final AtomicLong timestamp = new AtomicLong(currentTime());
private static final StateMachineFactory<LocalizedResource,ResourceState,
@@ -96,6 +103,11 @@ public class LocalizedResource implement
this.rsrc = rsrc;
this.dispatcher = dispatcher;
this.ref = new LinkedList<ContainerId>();
+
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ this.readLock = readWriteLock.readLock();
+ this.writeLock = readWriteLock.writeLock();
+
this.stateMachine = stateMachineFactory.make(this);
}
@@ -112,21 +124,26 @@ public class LocalizedResource implement
return sb.toString();
}
- public void release(ContainerId container) {
+ private void release(ContainerId container) {
if (!ref.remove(container)) {
LOG.info("Attempt to release claim on " + this +
" from unregistered container " + container);
- assert false;
+ assert false; // TODO: FIX
}
timestamp.set(currentTime());
}
- long currentTime() {
+ private long currentTime() {
return System.nanoTime();
}
public ResourceState getState() {
- return stateMachine.getCurrentState();
+ this.readLock.lock();
+ try {
+ return stateMachine.getCurrentState();
+ } finally {
+ this.readLock.unlock();
+ }
}
public LocalResourceRequest getRequest() {
@@ -142,9 +159,28 @@ public class LocalizedResource implement
}
@Override
- public synchronized void handle(ResourceEvent event) {
- stateMachine.doTransition(event.getType(), event);
- // TODO: Invalid transitions?
+ public void handle(ResourceEvent event) {
+ try {
+ this.writeLock.lock();
+
+ Path resourcePath = event.getLocalResourceRequest().getPath();
+ LOG.info("Processing " + resourcePath + " of type " + event.getType());
+
+ ResourceState oldState = this.stateMachine.getCurrentState();
+ ResourceState newState = null;
+ try {
+ newState = this.stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.warn("Can't handle this event at current state", e);
+ }
+ if (oldState != newState) {
+ LOG.info("Resource " + resourcePath + " transitioned from "
+ + oldState
+ + " to " + newState);
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
}
static abstract class ResourceTransition implements
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1124996&r1=1124995&r2=1124996&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Thu May 19 17:04:11 2011
@@ -534,6 +534,8 @@ public class ResourceLocalizationService
LOG.info("DEBUG: FAILED " + req, stat.getException());
assoc.getResource().unlock();
response.setLocalizerAction(LocalizerAction.DIE);
+ // TODO: Why is this event going directly to the container. Why not
+ // the resource itself? What happens to the resource? Is it removed?
dispatcher.getEventHandler().handle(
new ContainerResourceFailedEvent(context.getContainerId(),
req, stat.getException()));