You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hadoop.apache.org by Marcelo Valle <ma...@ktech.com> on 2019/10/23 13:21:05 UTC

inputformat - how to list all active trackers?

I am creating a custom input format that extends FileInputFormat (
org.apache.hadoop.mapreduce.lib.input.FileInputFormat). I used the new
hadoop API - hadoop-mapreduce-client-ore 2.8.3 and I am running this on AWS
EMR.

My intention is to spread the input files among the hosts, in a way each
host gets similar sum of file sizes. File sizes are directly proportional
to processing time in my case and the more uniform the distribution of
bytes among the slave nodes, the better.

Please find bellow my scala code on `getSplits` that does that.

My problem - I was expecting the function `listAllHosts` to be able to get
the full list of active trackers from the block locations, but for my
surprise, at run time I got a list with a single element: "*".

How could I possibly get the full list of active trackers from inside the
input format?

My code:
```

override def getSplits(job: JobContext): java.util.List[InputSplit] = {
  val fileStatusList: List[FileStatus] = listStatus(job).toList

  logger.info(s"all files to process = ${fileStatusList.map(status =>
(status.getPath.getName, status.getLen)).mkString(",")}")

  val allHosts = listAllHosts(job.getConfiguration, fileStatusList)
  logger.info(s"all hosts on cluster = $allHosts")

  def lenOf(fileStatus: FileStatus): Long = fileStatus.getLen

  val distributedFiles =
FileHostLoadBalancer.distributeFilesOnHosts[FileStatus](allHosts,
fileStatusList, lenOf)
  //1 split per file, 1 file per host
  distributedFiles.map {
    case (status, hosts) => new FileSplit(status.getPath, 0,
status.getLen, hosts.toArray)
  }.toList
}


private def listAllHosts(conf: Configuration, statusList:
List[FileStatus]): List[String] = {
  val blockLocationArrays: List[Array[BlockLocation]] = statusList.map {
    case locatedFileStatus: LocatedFileStatus =>
locatedFileStatus.getBlockLocations
    case other: FileStatus =>
other.getPath.getFileSystem(conf).getFileBlockLocations(other, 0,
other.getLen)
  }
  blockLocationArrays.flatten.flatMap(_.getHosts).distinct
}

```

Best regards,
Marcelo.

This email is confidential [and may be protected by legal privilege]. If you are not the intended recipient, please do not copy or disclose its content but contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United Kingdom

Fwd: inputformat - how to list all active trackers?

Posted by Marcelo Valle <ma...@ktech.com>.
Hi,

I am creating a custom input format that extends FileInputFormat (
org.apache.hadoop.mapreduce.lib.input.FileInputFormat). I used the new
hadoop API - hadoop-mapreduce-client-ore 2.8.3 and I am running this on AWS
EMR.

My intention is to spread the input files among the hosts, in a way each
host gets similar sum of file sizes. File sizes are directly proportional
to processing time in my case and the more uniform the distribution of
bytes among the slave nodes, the better.

Please find bellow my scala code on `getSplits` that does that.

My problem - I was expecting the function `listAllHosts` to be able to get
the full list of active trackers from the block locations, but for my
surprise, at run time I got a list with a single element: "*".

How could I possibly get the full list of active trackers from inside the
input format?

My code:
```

override def getSplits(job: JobContext): java.util.List[InputSplit] = {
  val fileStatusList: List[FileStatus] = listStatus(job).toList

  logger.info(s"all files to process = ${fileStatusList.map(status =>
(status.getPath.getName, status.getLen)).mkString(",")}")

  val allHosts = listAllHosts(job.getConfiguration, fileStatusList)
  logger.info(s"all hosts on cluster = $allHosts")

  def lenOf(fileStatus: FileStatus): Long = fileStatus.getLen

  val distributedFiles =
FileHostLoadBalancer.distributeFilesOnHosts[FileStatus](allHosts,
fileStatusList, lenOf)
  //1 split per file, 1 file per host
  distributedFiles.map {
    case (status, hosts) => new FileSplit(status.getPath, 0,
status.getLen, hosts.toArray)
  }.toList
}


private def listAllHosts(conf: Configuration, statusList:
List[FileStatus]): List[String] = {
  val blockLocationArrays: List[Array[BlockLocation]] = statusList.map {
    case locatedFileStatus: LocatedFileStatus =>
locatedFileStatus.getBlockLocations
    case other: FileStatus =>
other.getPath.getFileSystem(conf).getFileBlockLocations(other, 0,
other.getLen)
  }
  blockLocationArrays.flatten.flatMap(_.getHosts).distinct
}

```

Best regards,
Marcelo.

This email is confidential [and may be protected by legal privilege]. If you are not the intended recipient, please do not copy or disclose its content but contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United Kingdom