You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sebastian Neef (JIRA)" <ji...@apache.org> on 2016/03/03 12:23:18 UTC

[jira] [Created] (FLINK-3568) Hadoop's Bzip decompression is not thread safe

Sebastian Neef created FLINK-3568:
-------------------------------------

             Summary: Hadoop's Bzip decompression is not thread safe
                 Key: FLINK-3568
                 URL: https://issues.apache.org/jira/browse/FLINK-3568
             Project: Flink
          Issue Type: Bug
          Components: Hadoop Compatibility
    Affects Versions: 0.10.1
            Reporter: Sebastian Neef


Hi,
first of all, this is my first time filing a bug report for Apache Flink. If you need any additional information or something else, please let me know. 

h1. Background
I was trying to process [Wikipedia's XML dumps|https://dumps.wikimedia.org/enwiki/20160204/] with Apache Flink. To save disk space I decided to use the bziped versions. 

Apache Flink is compatible to [Hadoop's InputFormats |https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/hadoop_compatibility.html] and Hadoop's TextInputFormat [supports compressed files|https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.html#line.5]. [Bzip2 files are splittable|http://comphadoop.weebly.com/index.html] and thus perfect for parallel processing.

h1. Problem
I started to test the decompression with a simple Job based on the Apache Flink Quickstart code:

{code}
    public static void main(String[] args) throws Exception {

        if(args.length != 2) {
            System.err.println("USAGE: Job <wikipediadump.xml.bz2> <output.txt>");
            return;
        }

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<Tuple2<LongWritable, Text>> input =
                env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, args[0]);

        input.writeAsText(args[1]);


        // execute program
        env.execute("Bzip compression test");
}
{code}
When starting the job, I get the following exception:
{noformat}
02/29/2016 11:59:50 CHAIN DataSource (at createInput(ExecutionEnvironment.java:508) (org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat)) -> Map (Map at main(Job.java:67))(1/2) switched to FAILED 
java.lang.ArrayIndexOutOfBoundsException: 18002
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:730)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:801)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:504)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
    at org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:399)
    at org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:483)
    at java.io.InputStream.read(InputStream.java:101)
    at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.readLine(CompressedSplitLineReader.java:159)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
    at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.fetchNext(HadoopInputFormatBase.java:185)
    at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.reachedEnd(HadoopInputFormatBase.java:179)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:166)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
{noformat}
This does not happen with "-p 1", but a parallelism greater 1.

h1. Research
Googling the error message leads to some spark/hadoop mailing lists and it looks like the used "compress.bzip2.CBZip2InputStream" class is not threadsafe:

- [Link one|https://issues.apache.org/jira/browse/HADOOP-10614]
- [Link two|http://stackoverflow.com/questions/5159602/processing-a-bzip-string-file-in-scala]
- [Link three|http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ArrayIndexOutOfBoundsException-using-sc-textFile-on-BZ2-compressed-files-td22905.html]
- [Link four|https://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3C1402318634.7682.YahooMailNeo@web190401.mail.sg3.yahoo.com%3E]

Especially Link one is the most interesting one, because the Hadoop project resolved the issue:

{quote}
Hadoop uses CBZip2InputStream to decode bzip2 files. However, the implementation is not threadsafe. This is not a really problem for Hadoop MapReduce because Hadoop runs each task in a separate JVM. But for other libraries that utilize multithreading and use Hadoop's InputFormat, e.g., Spark, it will cause exceptions like the following:
{quote}

My guess is that Apache Flink needs to update/patch the CBZip2InputStream class to resolve the problem? 

All the best,
Sebastian



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)