You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-dev@hadoop.apache.org by "Chris Riccomini (Created) (JIRA)" <ji...@apache.org> on 2011/10/25 19:12:32 UTC

[jira] [Created] (MAPREDUCE-3261) AM unable to release containers

AM unable to release containers
-------------------------------

                 Key: MAPREDUCE-3261
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3261
             Project: Hadoop Map/Reduce
          Issue Type: Bug
          Components: applicationmaster
         Environment: [criccomi@criccomi-ld hadoop-trunk]$ svn info
Path: .
URL: http://svn.apache.org/repos/asf/hadoop/common/trunk
Repository Root: http://svn.apache.org/repos/asf
Repository UUID: 13f79535-47bb-0310-9956-ffa450edef68
Revision: 1185741
Node Kind: directory
Schedule: normal
Last Changed Author: acmurthy
Last Changed Rev: 1185488
Last Changed Date: 2011-10-17 22:30:32 -0700 (Mon, 17 Oct 2011)

            Reporter: Chris Riccomini
             Fix For: 0.23.0, 0.24.0


I'm probably doing something wrong here, but I can't figure it out.

My ApplicationMaster is sending an AllocateRequest with ContainerIds to release. My ResourceManager logs say:

2011-10-25 10:02:52,236 WARN  resourcemanager.RMAuditLogger (RMAuditLogger.java:logFailure(207)) - USER=criccomi	IP=127.0.0.1	OPERATION=AM Released Container	TARGET=FifoScheduler	RESULT=FAILURE	DESCRIPTION=Trying to release container not owned by app or with invalid id	PERMISSIONS=Unauthorized access or invalid container	APPID=application_1319485153554_0028	CONTAINERID=container_1319485153554_0028_01_000003

The container ID is valid, as is the app id:

[criccomi@criccomi-ld logs]$ pwd
/tmp/logs
[criccomi@criccomi-ld logs]$ find .
.
./application_1319485153554_0028
./application_1319485153554_0028/container_1319485153554_0028_01_000002
./application_1319485153554_0028/container_1319485153554_0028_01_000002/stderr
./application_1319485153554_0028/container_1319485153554_0028_01_000002/stdout
./application_1319485153554_0028/container_1319485153554_0028_01_000001
./application_1319485153554_0028/container_1319485153554_0028_01_000001/stderr
./application_1319485153554_0028/container_1319485153554_0028_01_000001/stdout
./application_1319485153554_0028/container_1319485153554_0028_01_000003
./application_1319485153554_0028/container_1319485153554_0028_01_000003/stderr
./application_1319485153554_0028/container_1319485153554_0028_01_000003/stdout
./application_1319485153554_0028/container_1319485153554_0028_01_000006
./application_1319485153554_0028/container_1319485153554_0028_01_000006/stderr
./application_1319485153554_0028/container_1319485153554_0028_01_000006/stdout

The containers are still running.

My code to start a container, and then to release it:

  // ugi = UserGroupInformation.getCurrentUser
  // security is not enabled
  def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
    info("%s starting container %s %s %s %s %s" format (appAttemptId, packagePath, container, ugi, env, cmds))
    // connect to container manager (based on similar code in the ContainerLauncher in Hadoop MapReduce)
    val contToken = container.getContainerToken
    val address = container.getNodeId.getHost + ":" + container.getNodeId.getPort
    var user = ugi

    if (UserGroupInformation.isSecurityEnabled) {
      debug("%s security is enabled" format (appAttemptId))
      val hadoopToken = new Token[ContainerTokenIdentifier](contToken.getIdentifier.array, contToken.getPassword.array, new Text(contToken.getKind), new Text(contToken.getService))
      user = UserGroupInformation.createRemoteUser(address)
      user.addToken(hadoopToken)
      info("%s changed user to %s" format (appAttemptId, user))
    }

    val containerManager = user.doAs(new PrivilegedAction[ContainerManager] {
      def run(): ContainerManager = {
        return YarnRPC.create(conf).getProxy(classOf[ContainerManager], NetUtils.createSocketAddr(address), conf).asInstanceOf[ContainerManager]
      }
    })

    // set the local package so that the containers and app master are provisioned with it
    val packageResource = Records.newRecord(classOf[LocalResource])
    val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
    val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)

    packageResource.setResource(packageUrl)
    packageResource.setSize(fileStatus.getLen)
    packageResource.setTimestamp(fileStatus.getModificationTime)
    packageResource.setType(LocalResourceType.ARCHIVE)
    packageResource.setVisibility(LocalResourceVisibility.APPLICATION)

    // start the container
    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
    ctx.setEnvironment(env)
    ctx.setContainerId(container.getId())
    ctx.setResource(container.getResource())
    ctx.setUser(user.getShortUserName())
    ctx.setCommands(cmds.toList)
    ctx.setLocalResources(Collections.singletonMap("package", packageResource))

    debug("%s setting package to %s" format (appAttemptId, packageResource))
    debug("%s setting context to %s" format (appAttemptId, ctx))

    val startContainerRequest = Records.newRecord(classOf[StartContainerRequest])
    startContainerRequest.setContainerLaunchContext(ctx)
    containerManager.startContainer(startContainerRequest)
  }

-----

  def sendResourceRequest(requests: List[ResourceRequest], release: List[ContainerId]): AMResponse = {
    info("%s sending resource request %s %s" format (appAttemptId, requests, release))
    val req = Records.newRecord(classOf[AllocateRequest])
    req.setResponseId(requestId)
    req.setApplicationAttemptId(appAttemptId)
    req.addAllAsks(requests)
    req.addAllReleases(release)
    requestId += 1
    debug("%s RM resource request %s" format (appAttemptId, req))
    resourceManager.allocate(req).getAMResponse
  }

I have double checked that my ContainerIds are accurate, and they are.

Any idea what I'm doing wrong here?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Resolved] (MAPREDUCE-3261) AM unable to release containers

Posted by "Chris Riccomini (Resolved) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-3261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Chris Riccomini resolved MAPREDUCE-3261.
----------------------------------------

    Resolution: Fixed

Sigh. Checked debug and everything looks fine in yarn. Appears to be a bug in my app master's "ask for more/less" logic.
                
> AM unable to release containers
> -------------------------------
>
>                 Key: MAPREDUCE-3261
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3261
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: applicationmaster
>         Environment: [criccomi@criccomi-ld hadoop-trunk]$ svn info
> Path: .
> URL: http://svn.apache.org/repos/asf/hadoop/common/trunk
> Repository Root: http://svn.apache.org/repos/asf
> Repository UUID: 13f79535-47bb-0310-9956-ffa450edef68
> Revision: 1185741
> Node Kind: directory
> Schedule: normal
> Last Changed Author: acmurthy
> Last Changed Rev: 1185488
> Last Changed Date: 2011-10-17 22:30:32 -0700 (Mon, 17 Oct 2011)
>            Reporter: Chris Riccomini
>             Fix For: 0.23.0, 0.24.0
>
>
> I'm probably doing something wrong here, but I can't figure it out.
> My ApplicationMaster is sending an AllocateRequest with ContainerIds to release. My ResourceManager logs say:
> 2011-10-25 10:02:52,236 WARN  resourcemanager.RMAuditLogger (RMAuditLogger.java:logFailure(207)) - USER=criccomi	IP=127.0.0.1	OPERATION=AM Released Container	TARGET=FifoScheduler	RESULT=FAILURE	DESCRIPTION=Trying to release container not owned by app or with invalid id	PERMISSIONS=Unauthorized access or invalid container	APPID=application_1319485153554_0028	CONTAINERID=container_1319485153554_0028_01_000003
> The container ID is valid, as is the app id:
> [criccomi@criccomi-ld logs]$ pwd
> /tmp/logs
> [criccomi@criccomi-ld logs]$ find .
> .
> ./application_1319485153554_0028
> ./application_1319485153554_0028/container_1319485153554_0028_01_000002
> ./application_1319485153554_0028/container_1319485153554_0028_01_000002/stderr
> ./application_1319485153554_0028/container_1319485153554_0028_01_000002/stdout
> ./application_1319485153554_0028/container_1319485153554_0028_01_000001
> ./application_1319485153554_0028/container_1319485153554_0028_01_000001/stderr
> ./application_1319485153554_0028/container_1319485153554_0028_01_000001/stdout
> ./application_1319485153554_0028/container_1319485153554_0028_01_000003
> ./application_1319485153554_0028/container_1319485153554_0028_01_000003/stderr
> ./application_1319485153554_0028/container_1319485153554_0028_01_000003/stdout
> ./application_1319485153554_0028/container_1319485153554_0028_01_000006
> ./application_1319485153554_0028/container_1319485153554_0028_01_000006/stderr
> ./application_1319485153554_0028/container_1319485153554_0028_01_000006/stdout
> The containers are still running.
> My code to start a container, and then to release it:
> {code}
>   // ugi = UserGroupInformation.getCurrentUser
>   // security is not enabled
>   def startContainer(packagePath: Path, container: Container, ugi: UserGroupInformation, env: Map[String, String], cmds: String*) {
>     info("%s starting container %s %s %s %s %s" format (appAttemptId, packagePath, container, ugi, env, cmds))
>     // connect to container manager (based on similar code in the ContainerLauncher in Hadoop MapReduce)
>     val contToken = container.getContainerToken
>     val address = container.getNodeId.getHost + ":" + container.getNodeId.getPort
>     var user = ugi
>     if (UserGroupInformation.isSecurityEnabled) {
>       debug("%s security is enabled" format (appAttemptId))
>       val hadoopToken = new Token[ContainerTokenIdentifier](contToken.getIdentifier.array, contToken.getPassword.array, new Text(contToken.getKind), new Text(contToken.getService))
>       user = UserGroupInformation.createRemoteUser(address)
>       user.addToken(hadoopToken)
>       info("%s changed user to %s" format (appAttemptId, user))
>     }
>     val containerManager = user.doAs(new PrivilegedAction[ContainerManager] {
>       def run(): ContainerManager = {
>         return YarnRPC.create(conf).getProxy(classOf[ContainerManager], NetUtils.createSocketAddr(address), conf).asInstanceOf[ContainerManager]
>       }
>     })
>     // set the local package so that the containers and app master are provisioned with it
>     val packageResource = Records.newRecord(classOf[LocalResource])
>     val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
>     val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)
>     packageResource.setResource(packageUrl)
>     packageResource.setSize(fileStatus.getLen)
>     packageResource.setTimestamp(fileStatus.getModificationTime)
>     packageResource.setType(LocalResourceType.ARCHIVE)
>     packageResource.setVisibility(LocalResourceVisibility.APPLICATION)
>     // start the container
>     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
>     ctx.setEnvironment(env)
>     ctx.setContainerId(container.getId())
>     ctx.setResource(container.getResource())
>     ctx.setUser(user.getShortUserName())
>     ctx.setCommands(cmds.toList)
>     ctx.setLocalResources(Collections.singletonMap("package", packageResource))
>     debug("%s setting package to %s" format (appAttemptId, packageResource))
>     debug("%s setting context to %s" format (appAttemptId, ctx))
>     val startContainerRequest = Records.newRecord(classOf[StartContainerRequest])
>     startContainerRequest.setContainerLaunchContext(ctx)
>     containerManager.startContainer(startContainerRequest)
>   }
> {code}
> -----
> {code}
>   def sendResourceRequest(requests: List[ResourceRequest], release: List[ContainerId]): AMResponse = {
>     info("%s sending resource request %s %s" format (appAttemptId, requests, release))
>     val req = Records.newRecord(classOf[AllocateRequest])
>     req.setResponseId(requestId)
>     req.setApplicationAttemptId(appAttemptId)
>     req.addAllAsks(requests)
>     req.addAllReleases(release)
>     requestId += 1
>     debug("%s RM resource request %s" format (appAttemptId, req))
>     resourceManager.allocate(req).getAMResponse
>   }
> {code}
> I have double checked that my ContainerIds are accurate, and they are.
> Any idea what I'm doing wrong here?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira