You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-user@hadoop.apache.org by POUPON Kevin <ke...@altran.com> on 2014/09/10 12:29:27 UTC

MapReduce data decompression using a custom codec

Hello,

I developed a custom compression codec for Hadoop. Of course Hadoop is set to use my codec when compressing data.
For testing purposes, I use the following two commands:

Compression test command:
-----------------------------------
hadoop jar /opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p0.3/lib/hadoop//../hadoop-mapreduce/hadoop-streaming.jar -Dmapreduce.output.fileoutputformat.compress=true -input /originalFiles/ -output /compressedFiles/ -mapper cat -reducer cat


Decompression test command:
-----------------------------------
hadoop jar /opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p0.3/lib/hadoop//../hadoop-mapreduce/hadoop-streaming.jar -Dmapreduce.output.fileoutputformat.compress=false -input /compressedFiles/ -output /decompressedFiles/ -mapper cat -reducer cat


As you can see, both of them are quite similar: only the compression option changes and the input/output directories.

The first command compresses the input data then 'cat' (the Linux command, you know) it to the output file.
The second one decompresses the input  data (which are supposed to be compressed) then 'cat' it to the output file. As I understand, Hadoop is supposed to auto-detect compressed input data and decompress it using the right codec.

Those test compression and decompression work well when Hadoop is set to use a default codec, like BZip2 or Snappy.

However, when using my custom compression codec, only the compression works: the decompression is sluggish and triggers errors (Java heap space):

packageJobJar: [] [/opt/cloudera/parcels/CDH-5.1.2-1.cdh5.1.2.p0.3/lib/hadoop-mapreduce/hadoop-streaming-2.3.0-cdh5.1.2.jar] /tmp/streamjob6475393520304432687.jar tmpDir=null
14/09/09 15:33:21 INFO client.RMProxy: Connecting to ResourceManager at bluga2/10.1.96.222:8032
14/09/09 15:33:22 INFO client.RMProxy: Connecting to ResourceManager at bluga2/10.1.96.222:8032
14/09/09 15:33:23 INFO mapred.FileInputFormat: Total input paths to process : 1
14/09/09 15:33:23 INFO mapreduce.JobSubmitter: number of splits:1
14/09/09 15:33:23 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1410264242020_0016
14/09/09 15:33:24 INFO impl.YarnClientImpl: Submitted application application_1410264242020_0016
14/09/09 15:33:24 INFO mapreduce.Job: The url to track the job: http://bluga2:8088/proxy/application_1410264242020_0016/
14/09/09 15:33:24 INFO mapreduce.Job: Running job: job_1410264242020_0016
14/09/09 15:33:30 INFO mapreduce.Job: Job job_1410264242020_0016 running in uber mode : false
14/09/09 15:33:30 INFO mapreduce.Job:  map 0% reduce 0%
14/09/09 15:35:12 INFO mapreduce.Job:  map 100% reduce 0%
14/09/09 15:35:13 INFO mapreduce.Job: Task Id : attempt_1410264242020_0016_m_000000_0, Status : FAILED
Error: Java heap space
14/09/09 15:35:14 INFO mapreduce.Job:  map 0% reduce 0%
14/09/09 15:35:41 INFO mapreduce.Job: Task Id : attempt_1410264242020_0016_m_000000_1, Status : FAILED
Error: Java heap space
14/09/09 15:36:02 INFO mapreduce.Job: Task Id : attempt_1410264242020_0016_m_000000_2, Status : FAILED
Error: Java heap space
14/09/09 15:36:49 INFO mapreduce.Job:  map 100% reduce 0%
14/09/09 15:36:50 INFO mapreduce.Job:  map 100% reduce 100%
14/09/09 15:36:56 INFO mapreduce.Job: Job job_1410264242020_0016 failed with state FAILED due to: Task failed task_1410264242020_0016_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

14/09/09 15:36:58 INFO mapreduce.Job: Counters: 9
       Job Counters
             Failed map tasks=4
             Launched map tasks=4
             Other local map tasks=3
             Data-local map tasks=1
             Total time spent by all maps in occupied slots (ms)=190606
             Total time spent by all reduces in occupied slots (ms)=0
             Total time spent by all map tasks (ms)=190606
             Total vcore-seconds taken by all map tasks=190606
             Total megabyte-seconds taken by all map tasks=195180544
14/09/09 15:36:58 ERROR streaming.StreamJob: Job not Successful!
Streaming Command Failed!

I already tried to increase the map maximum heap size (mapreduce.map.java.opts.max.heap's YARN property) from 1 GiB to 2 GiB but the decompression still doesn't work. By the way, I'm compressing and decompressing a small ~2MB file and use the latest Cloudera version.

I built a quick Java test environment to try to reproduce the Hadoop codec call (instantiating the codec, creating a new compression stream from it ...). I noticed that the decompression is an infinite loop where only the first block of compressed data is decompressed, infinitely. This could explain the above Java heap space error.

What am I doing wrong/what did I forget ? How could my codec decompress data without troubles?

Thank you for helping !

Kévin Poupon