You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Terje Berg-Hansen <te...@axenna.com> on 2014/05/18 22:38:35 UTC

unsubscribe


Andre Bois-Crettez <an...@kelkoo.com> skrev: 

>We never saw your exception when reading bzip2 files with spark.
>
>But when we wrongly compiled spark against older version of hadoop (was
>default in spark), we ended up with sequential reading of bzip2 file,
>not taking advantage of block splits to work in parallel.
>Once we compiled spark with SPARK_HADOOP_VERSION=2.2.0, files were read
>in parallel, as expected with a recent hadoop.
>
>http://spark.apache.org/docs/0.9.1/#a-note-about-hadoop-versions
>
>Make sure Spark is compiled against Hadoop v2
>
>André
>
>On 2014-05-13 18:08, Xiangrui Meng wrote:
>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
>> the problem you described, but it does contain several fixes to bzip2
>> format. -Xiangrui
>>
>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <an...@andrewash.com> wrote:
>>> Hi all,
>>>
>>> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
>>> success?
>>>
>>>
>>> I'm finding the following results on a recent commit (756c96 from 24hr ago)
>>> and CDH 4.4.0:
>>>
>>> Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>>> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
>>> s+"| " ).count
>>>
>>> Specifically, I'm getting an exception coming out of the bzip2 libraries
>>> (see below stacktraces), which is unusual because I'm able to read from that
>>> file without an issue using the same libraries via Pig.  It was originally
>>> created from Pig as well.
>>>
>>> Digging a little deeper I found this line in the .bz2 decompressor's javadoc
>>> for CBZip2InputStream:
>>>
>>> "Instances of this class are not threadsafe." [source]
>>>
>>>
>>> My current working theory is that Spark has a much higher level of
>>> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
>>> exceptions much more frequently (as in can't finish a run over a little 2M
>>> row file) vs hardly at all in other libraries.
>>>
>>> The only other reference I could find to the issue was in presto-users, but
>>> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
>>> the higher compression levels of .bz2.
>>>
>>>
>>> Would love to hear if I have some kind of configuration issue or if there's
>>> a bug in .bz2 that's fixed in later versions of CDH, or generally any other
>>> thoughts on the issue.
>>>
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>>
>>> Below are examples of some exceptions I'm getting:
>>>
>>> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>>> java.lang.ArrayIndexOutOfBoundsException
>>> java.lang.ArrayIndexOutOfBoundsException: 65535
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>>          at
>>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>>          at java.io.InputStream.read(InputStream.java:101)
>>>          at
>>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>>          at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>>          at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>>          at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>>
>>>
>>>
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: 900000
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>>          at
>>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>>          at java.io.InputStream.read(InputStream.java:101)
>>>          at
>>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>>          at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>>          at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>>          at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>>          at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>>          at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>>          at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>          at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>          at
>>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>>>
>>>
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: -921878509
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>>          at
>>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432)
>>>          at java.io.InputStream.read(InputStream.java:101)
>>>          at
>>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>>          at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>>          at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>>          at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>>          at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>>          at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>>          at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>          at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>          at
>>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>>>          at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
>>>          at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
>>>          at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
>>>          at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
>>>
>>>
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: -1321104434
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>>          at
>>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>>          at
>>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>>          at java.io.InputStream.read(InputStream.java:101)
>>>          at
>>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>>          at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>>          at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>>          at
>>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>>          at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>>          at
>>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>>          at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>          at
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>          at
>>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>
>
>--
>André Bois-Crettez
>
>Software Architect
>Big Data Developer
>http://www.kelkoo.com/
>
>
>Kelkoo SAS
>Société par Actions Simplifiée
>Au capital de € 4.168.964,30
>Siège social : 8, rue du Sentier 75002 Paris
>425 093 069 RCS Paris
>
>Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.