You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "ZhanxiongWang (Jira)" <ji...@apache.org> on 2019/09/17 10:31:00 UTC

[jira] [Updated] (SPARK-29114) Dataset.coalesce(10) throw ChunkFetchFailureException when original Dataset partition size is big

     [ https://issues.apache.org/jira/browse/SPARK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ZhanxiongWang updated SPARK-29114:
----------------------------------
    Summary: Dataset<Row>.coalesce(10) throw ChunkFetchFailureException when original Dataset partition size is big  (was: Dataset<Row>.coalesce(10) throw ChunkFetchFailureException when original Dataset size is big)

> Dataset<Row>.coalesce(10) throw ChunkFetchFailureException when original Dataset partition size is big
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-29114
>                 URL: https://issues.apache.org/jira/browse/SPARK-29114
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager
>    Affects Versions: 2.3.0
>            Reporter: ZhanxiongWang
>            Priority: Major
>
> I create a Dataset<Row> df with 200 partitions. I applied for 100 executors for my task. Each executor with 1 core, and driver memory is 8G executor is 16G. I use df.cache() before df.coalesce(10). When{color:#de350b} Dataset<Row>{color} {color:#de350b}size is small{color}, the program works well. But when I {color:#de350b}increase{color} the size of the Dataset<Row>, the function {color:#de350b}df.coalesce(10){color} will throw ChunkFetchFailureException.
> 19/09/17 08:26:44 INFO CoarseGrainedExecutorBackend: Got assigned task 210
> 19/09/17 08:26:44 INFO Executor: Running task 0.0 in stage 3.0 (TID 210)
> 19/09/17 08:26:44 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
> 19/09/17 08:26:44 INFO TorrentBroadcast: Started reading broadcast variable 1003
> 19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 3.8 GB)
> 19/09/17 08:26:44 INFO TorrentBroadcast: Reading broadcast variable 1003 took 7 ms
> 19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 3.8 GB)
> 19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_0 locally
> 19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_1 locally
> 19/09/17 08:26:44 INFO TransportClientFactory: Successfully created connection to /100.76.29.130:54238 after 1 ms (0 ms spent in bootstraps)
> 19/09/17 08:26:46 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_18, and will not retry (0 retries)
> org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
>  at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
>  at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
>  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>  at java.lang.Thread.run(Thread.java:745)
> 19/09/17 08:26:46 WARN BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
> org.apache.spark.SparkException: Exception thrown in awaitResult: 
>  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
>  at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115)
>  at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:691)
>  at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:634)
>  at org.apache.spark.storage.BlockManager.get(BlockManager.scala:747)
>  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:802)
>  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
>  at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
>  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
>  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
>  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
>  at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
>  at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
>  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>  ... 1 more
> 19/09/17 08:26:46 INFO NewHadoopRDD: Input split: 9.46.2.233:0935a68ad8000000,0935d7fb180999FF
> 19/09/17 08:26:46 INFO TorrentBroadcast: Started reading broadcast variable 93
> 19/09/17 08:26:46 INFO MemoryStore: Block broadcast_93_piece0 stored as bytes in memory (estimated size 32.5 KB, free 3.8 GB)
> 19/09/17 08:26:46 INFO TorrentBroadcast: Reading broadcast variable 93 took 8 ms
> 19/09/17 08:26:47 INFO MemoryStore: Block broadcast_93 stored as values in memory (estimated size 372.0 KB, free 3.8 GB)
> 19/09/17 08:26:47 INFO RecoverableZooKeeper: Process identifier=hconnection-0x1aa852f0 connecting to ZooKeeper ensemble=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181
> 19/09/17 08:26:47 INFO ZooKeeper: Initiating client connection, connectString=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 sessionTimeout=90000 watcher=hconnection-0x1aa852f0, quorum=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181, baseZNode=/hbase-qq-mp-ss-slave
> 19/09/17 08:26:47 INFO ClientCnxn: Opening socket connection to server 10.254.82.84/10.254.82.84:2181. Will not attempt to authenticate using SASL (unknown error)
> 19/09/17 08:26:47 INFO ClientCnxn: Socket connection established to 10.254.82.84/10.254.82.84:2181, initiating session
> 19/09/17 08:26:47 INFO ClientCnxn: Session establishment complete on server 10.254.82.84/10.254.82.84:2181, sessionid = 0x36c7f371e67307f, negotiated timeout = 90000
> 19/09/17 08:26:47 INFO TableInputFormatBase: Input split length: 19.8 G bytes.
> 19/09/17 08:41:37 INFO HConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x36c7f371e67307f
> 19/09/17 08:41:38 INFO ZooKeeper: Session: 0x36c7f371e67307f closed
> 19/09/17 08:41:38 INFO ClientCnxn: EventThread shut down
> 19/09/17 08:41:38 INFO MemoryStore: Block rdd_1005_18 stored as values in memory (estimated size 1822.9 MB, free 2025.1 MB)
> 19/09/17 08:41:38 INFO TransportClientFactory: Successfully created connection to /9.10.29.145:37002 after 0 ms (0 ms spent in bootstraps)
> 19/09/17 08:41:39 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_32, and will not retry (0 retries)
> org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=1800856515000, chunkIndex=0}: readerIndex: 0, writerIndex: -2138342822 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2138342822))
>  
> Let me explain more experimental details.
> {color:#de350b}When df size is small:{color}
>  * 
>  * *Storage Level:* Memory Deserialized 1x Replicated
>  * *Cached Partitions:* 200
>  * *Total Partitions:* 200
>  * *Memory Size:* 356.2 GB
>  * *Disk Size:* 0.0 B
> h4. Data Distribution on 100 Executors
> {color:#de350b}*The Storage situation for the top 10* *cached the most memory* *executor:* {color}
> on heap memory usage : 5.3 GB (3.8 GB Remaining)
> off heap memory usage : 0.0 B (0.0 B Remaining)
> disk usage : 0.0 B
> {color:#de350b}*The Storage situation for the rest executors :*{color}
> on heap memory usage : 3.6 GB (5.6 GB Remaining)
> off heap memory usage : 0.0 B (0.0 B Remaining)
> disk usage : 0.0 B
>  
> The log of the successful task is as follows:
>  
> 19/09/17 09:33:17 INFO CoarseGrainedExecutorBackend: Got assigned task 211
> 19/09/17 09:33:17 INFO Executor: Running task 3.0 in stage 3.0 (TID 211)
> 19/09/17 09:33:17 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
> 19/09/17 09:33:17 INFO TorrentBroadcast: Started reading broadcast variable 1003
> 19/09/17 09:33:17 INFO TransportClientFactory: Successfully created connection to /9.10.19.210:51072 after 8 ms (0 ms spent in bootstraps)
> 19/09/17 09:33:17 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 5.5 GB)
> 19/09/17 09:33:17 INFO TorrentBroadcast: Reading broadcast variable 1003 took 36 ms
> 19/09/17 09:33:18 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 5.5 GB)
> 19/09/17 09:33:18 INFO BlockManager: Found block rdd_1005_6 locally
> 19/09/17 09:33:18 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:37220 after 1 ms (0 ms spent in bootstraps)
> 19/09/17 09:33:36 INFO BlockManager: Found block rdd_1005_33 remotely
> 19/09/17 09:33:37 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:32935 after 10 ms (0 ms spent in bootstraps)
> 19/09/17 09:33:50 INFO BlockManager: Found block rdd_1005_40 remotely
> 19/09/17 09:33:52 INFO TransportClientFactory: Successfully created connection to /100.76.25.87:45875 after 2 ms (0 ms spent in bootstraps)
> 19/09/17 09:34:06 INFO BlockManager: Found block rdd_1005_46 remotely
> 19/09/17 09:34:08 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:35134 after 32 ms (0 ms spent in bootstraps)
> 19/09/17 09:34:18 INFO BlockManager: Found block rdd_1005_48 remotely
> 19/09/17 09:34:20 INFO TransportClientFactory: Successfully created connection to /9.47.25.185:47504 after 1 ms (0 ms spent in bootstraps)
> 19/09/17 09:34:42 INFO BlockManager: Found block rdd_1005_49 remotely
> 19/09/17 09:34:44 INFO TransportClientFactory: Successfully created connection to /100.76.33.91:35365 after 1 ms (0 ms spent in bootstraps)
> 19/09/17 09:34:59 INFO BlockManager: Found block rdd_1005_51 remotely
> 19/09/17 09:35:01 INFO TransportClientFactory: Successfully created connection to /9.10.7.26:49383 after 3 ms (0 ms spent in bootstraps)
> 19/09/17 09:35:16 INFO BlockManager: Found block rdd_1005_71 remotely
> 19/09/17 09:35:18 INFO TransportClientFactory: Successfully created connection to /100.76.72.246:51684 after 2 ms (0 ms spent in bootstraps)
> 19/09/17 09:35:28 INFO BlockManager: Found block rdd_1005_75 remotely
> 19/09/17 09:35:30 INFO TransportClientFactory: Successfully created connection to /9.47.30.46:51291 after 1 ms (0 ms spent in bootstraps)
> 19/09/17 09:35:45 INFO BlockManager: Found block rdd_1005_98 remotely
> 19/09/17 09:35:47 INFO TransportClientFactory: Successfully created connection to /9.10.137.17:56554 after 2 ms (0 ms spent in bootstraps)
> 19/09/17 09:36:00 INFO BlockManager: Found block rdd_1005_116 remotely
> 19/09/17 09:36:02 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:58951 after 2 ms (0 ms spent in bootstraps)
> 19/09/17 09:36:16 INFO BlockManager: Found block rdd_1005_121 remotely
> 19/09/17 09:36:19 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:50992 after 1 ms (0 ms spent in bootstraps)
> 19/09/17 09:36:27 INFO BlockManager: Found block rdd_1005_128 remotely
> 19/09/17 09:36:39 INFO BlockManager: Found block rdd_1005_134 remotely
> 19/09/17 09:36:42 INFO TransportClientFactory: Successfully created connection to /9.10.7.92:41607 after 73 ms (0 ms spent in bootstraps)
> 19/09/17 09:36:54 INFO BlockManager: Found block rdd_1005_153 remotely
> 19/09/17 09:37:06 INFO BlockManager: Found block rdd_1005_167 remotely
> 19/09/17 09:37:08 INFO BlockManager: Found block rdd_1005_174 locally
> 19/09/17 09:37:08 INFO TransportClientFactory: Successfully created connection to /9.10.29.150:43709 after 10 ms (0 ms spent in bootstraps)
> 19/09/17 09:37:20 INFO BlockManager: Found block rdd_1005_182 remotely
> 19/09/17 09:37:22 INFO TransportClientFactory: Successfully created connection to /9.10.8.84:55958 after 14 ms (0 ms spent in bootstraps)
> 19/09/17 09:37:32 INFO BlockManager: Found block rdd_1005_189 remotely
> 19/09/17 09:37:34 INFO Executor: Finished task 3.0 in stage 3.0 (TID 211). 1752 bytes result sent to driver
>  
> When I{color:#de350b} increase the size of the Dataset<Row>{color}:
>  * *Storage Level:* Disk Serialized 1x Replicated
>  * *Cached Partitions:* 200
>  * *Total Partitions:* 200
>  * *Memory Size:* 390.8 GB
>  * *Disk Size:* 166.8 GB
> h4. Data Distribution on 100 Executors
> {color:#de350b}*The Storage situation for the top 10* *cached the most memory* *executor:* {color}
> on heap memory usage : 7.2 GB (2008.3 MB Remaining)
> off heap memory usage : 0.0 B (0.0 B Remaining)
> disk usage : 18.1 GB
> {color:#de350b}*The Storage situation for the rest executors :*{color}
> on heap memory usage : 3.6 GB (5.6 GB Remaining)
> off heap memory usage : 0.0 B (0.0 B Remaining)
> disk usage : 0.0 B
> In this situation, 10 executors used disk usage because df.coalesce(10) throw ChunkFetchFailureException, 10 executors just fetch data from original datasource again, and cached in new Dataset<Row>.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org