You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2021/04/23 10:49:00 UTC

[jira] [Commented] (SPARK-35199) Tasks are failing with zstd default of spark.shuffle.mapStatus.compression.codec

    [ https://issues.apache.org/jira/browse/SPARK-35199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17330377#comment-17330377 ] 

Hyukjin Kwon commented on SPARK-35199:
--------------------------------------

cc [~dongjoon] FYI

> Tasks are failing with zstd default of spark.shuffle.mapStatus.compression.codec
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-35199
>                 URL: https://issues.apache.org/jira/browse/SPARK-35199
>             Project: Spark
>          Issue Type: Task
>          Components: PySpark
>    Affects Versions: 3.0.1
>            Reporter: Leonard Lausen
>            Priority: Major
>
> In Spark 3.0.1, tasks fail with the default value of {{spark.shuffle.mapStatus.compression.codec=zstd}}, but work without problem when changing the value to {{spark.shuffle.mapStatus.compression.codec=lz4}}.
> Exemplar backtrace:
>  
> {code:java}
> java.io.IOException: Decompression error: Version not supported at com.github.luben.zstd.ZstdInputStream.readInternal(ZstdInputStream.java:164) at com.github.luben.zstd.ZstdInputStream.read(ZstdInputStream.java:120) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2781) at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2797) at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3274) at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:934) at java.io.ObjectInputStream.(ObjectInputStream.java:396) at org.apache.spark.MapOutputTracker$.deserializeObject$1(MapOutputTracker.scala:954) at org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:964) at org.apache.spark.MapOutputTrackerWorker.$anonfun$getStatuses$2(MapOutputTracker.scala:856) at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64) at org.apache.spark.MapOutputTrackerWorker.getStatuses(MapOutputTracker.scala:851) at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:808) at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:128) at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:185) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)  {code}
> {{}}
> Exemplar code to reproduce the issue
> {code:java}
> import pyspark.sql.functions as F
> df = spark.read.text("s3://my-bucket-with-300GB-compressed-text-files")
> df_rand = df.orderBy(F.rand(1))
> df_rand.write.text('s3://shuffled-output''){code}
> See [https://stackoverflow.com/questions/64876463/spark-3-0-1-tasks-are-failing-when-using-zstd-compression-codec] for another report of this issue and workaround.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org