You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "ZhanxiongWang (Jira)" <ji...@apache.org> on 2019/09/17 04:02:00 UTC

[jira] [Created] (SPARK-29114) Dataset.coalesce(10) throw ChunkFetchFailureException when original Dataset size is big

ZhanxiongWang created SPARK-29114:
-------------------------------------

             Summary: Dataset<Row>.coalesce(10) throw ChunkFetchFailureException when original Dataset size is big
                 Key: SPARK-29114
                 URL: https://issues.apache.org/jira/browse/SPARK-29114
             Project: Spark
          Issue Type: Bug
          Components: Block Manager
    Affects Versions: 2.3.0
            Reporter: ZhanxiongWang


I create a Dataset<Row> df with 200 partitions. I applied for 100 executors for my task. Each executor with 1 core, and driver memory is 8G executor is 16G. I use df.cache() before df.coalesce(10). When{color:#de350b} Dataset<Row>{color} {color:#de350b}size is small{color}, the program works well. But when I {color:#de350b}increase{color} the size of the Dataset<Row>, the function {color:#de350b}df.coalesce(10){color} will throw ChunkFetchFailureException.

19/09/17 08:26:44 INFO CoarseGrainedExecutorBackend: Got assigned task 210
19/09/17 08:26:44 INFO Executor: Running task 0.0 in stage 3.0 (TID 210)
19/09/17 08:26:44 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
19/09/17 08:26:44 INFO TorrentBroadcast: Started reading broadcast variable 1003
19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 3.8 GB)
19/09/17 08:26:44 INFO TorrentBroadcast: Reading broadcast variable 1003 took 7 ms
19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 3.8 GB)
19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_0 locally
19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_1 locally
19/09/17 08:26:44 INFO TransportClientFactory: Successfully created connection to /100.76.29.130:54238 after 1 ms (0 ms spent in bootstraps)
19/09/17 08:26:46 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_18, and will not retry (0 retries)
org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
 at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
 at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
 at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
 at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 at java.lang.Thread.run(Thread.java:745)
19/09/17 08:26:46 WARN BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
org.apache.spark.SparkException: Exception thrown in awaitResult: 
 at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
 at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115)
 at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:691)
 at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:634)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:747)
 at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:802)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
 at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
 at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
 at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
 at org.apache.spark.scheduler.Task.run(Task.scala:109)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
 at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
 at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
 at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
 at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 ... 1 more
19/09/17 08:26:46 INFO NewHadoopRDD: Input split: 9.46.2.233:0935a68ad8000000,0935d7fb180999FF
19/09/17 08:26:46 INFO TorrentBroadcast: Started reading broadcast variable 93
19/09/17 08:26:46 INFO MemoryStore: Block broadcast_93_piece0 stored as bytes in memory (estimated size 32.5 KB, free 3.8 GB)
19/09/17 08:26:46 INFO TorrentBroadcast: Reading broadcast variable 93 took 8 ms
19/09/17 08:26:47 INFO MemoryStore: Block broadcast_93 stored as values in memory (estimated size 372.0 KB, free 3.8 GB)
19/09/17 08:26:47 INFO RecoverableZooKeeper: Process identifier=hconnection-0x1aa852f0 connecting to ZooKeeper ensemble=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181
19/09/17 08:26:47 INFO ZooKeeper: Initiating client connection, connectString=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 sessionTimeout=90000 watcher=hconnection-0x1aa852f0, quorum=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181, baseZNode=/hbase-qq-mp-ss-slave
19/09/17 08:26:47 INFO ClientCnxn: Opening socket connection to server 10.254.82.84/10.254.82.84:2181. Will not attempt to authenticate using SASL (unknown error)
19/09/17 08:26:47 INFO ClientCnxn: Socket connection established to 10.254.82.84/10.254.82.84:2181, initiating session
19/09/17 08:26:47 INFO ClientCnxn: Session establishment complete on server 10.254.82.84/10.254.82.84:2181, sessionid = 0x36c7f371e67307f, negotiated timeout = 90000
19/09/17 08:26:47 INFO TableInputFormatBase: Input split length: 19.8 G bytes.
19/09/17 08:41:37 INFO HConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x36c7f371e67307f
19/09/17 08:41:38 INFO ZooKeeper: Session: 0x36c7f371e67307f closed
19/09/17 08:41:38 INFO ClientCnxn: EventThread shut down
19/09/17 08:41:38 INFO MemoryStore: Block rdd_1005_18 stored as values in memory (estimated size 1822.9 MB, free 2025.1 MB)
19/09/17 08:41:38 INFO TransportClientFactory: Successfully created connection to /9.10.29.145:37002 after 0 ms (0 ms spent in bootstraps)
19/09/17 08:41:39 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_32, and will not retry (0 retries)
org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=1800856515000, chunkIndex=0}: readerIndex: 0, writerIndex: -2138342822 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2138342822))

 

Let me explain more experimental details.

{color:#de350b}When df size is small:{color}
 * 
 * *Storage Level:* Memory Deserialized 1x Replicated
 * *Cached Partitions:* 200
 * *Total Partitions:* 200
 * *Memory Size:* 356.2 GB
 * *Disk Size:* 0.0 B
h4. Data Distribution on 100 Executors

{color:#de350b}*The Storage situation for the top 10* *cached the most memory* *executor:* {color}

on heap memory usage : 5.3 GB (3.8 GB Remaining)

off heap memory usage : 0.0 B (0.0 B Remaining)

disk usage : 0.0 B

{color:#de350b}*The Storage situation for the rest executors :*{color}

on heap memory usage : 3.6 GB (5.6 GB Remaining)

off heap memory usage : 0.0 B (0.0 B Remaining)

disk usage : 0.0 B

 

The log of the successful task is as follows:

 

19/09/17 09:33:17 INFO CoarseGrainedExecutorBackend: Got assigned task 211
19/09/17 09:33:17 INFO Executor: Running task 3.0 in stage 3.0 (TID 211)
19/09/17 09:33:17 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
19/09/17 09:33:17 INFO TorrentBroadcast: Started reading broadcast variable 1003
19/09/17 09:33:17 INFO TransportClientFactory: Successfully created connection to /9.10.19.210:51072 after 8 ms (0 ms spent in bootstraps)
19/09/17 09:33:17 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 5.5 GB)
19/09/17 09:33:17 INFO TorrentBroadcast: Reading broadcast variable 1003 took 36 ms
19/09/17 09:33:18 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 5.5 GB)
19/09/17 09:33:18 INFO BlockManager: Found block rdd_1005_6 locally
19/09/17 09:33:18 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:37220 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:33:36 INFO BlockManager: Found block rdd_1005_33 remotely
19/09/17 09:33:37 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:32935 after 10 ms (0 ms spent in bootstraps)
19/09/17 09:33:50 INFO BlockManager: Found block rdd_1005_40 remotely
19/09/17 09:33:52 INFO TransportClientFactory: Successfully created connection to /100.76.25.87:45875 after 2 ms (0 ms spent in bootstraps)
19/09/17 09:34:06 INFO BlockManager: Found block rdd_1005_46 remotely
19/09/17 09:34:08 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:35134 after 32 ms (0 ms spent in bootstraps)
19/09/17 09:34:18 INFO BlockManager: Found block rdd_1005_48 remotely
19/09/17 09:34:20 INFO TransportClientFactory: Successfully created connection to /9.47.25.185:47504 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:34:42 INFO BlockManager: Found block rdd_1005_49 remotely
19/09/17 09:34:44 INFO TransportClientFactory: Successfully created connection to /100.76.33.91:35365 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:34:59 INFO BlockManager: Found block rdd_1005_51 remotely
19/09/17 09:35:01 INFO TransportClientFactory: Successfully created connection to /9.10.7.26:49383 after 3 ms (0 ms spent in bootstraps)
19/09/17 09:35:16 INFO BlockManager: Found block rdd_1005_71 remotely
19/09/17 09:35:18 INFO TransportClientFactory: Successfully created connection to /100.76.72.246:51684 after 2 ms (0 ms spent in bootstraps)
19/09/17 09:35:28 INFO BlockManager: Found block rdd_1005_75 remotely
19/09/17 09:35:30 INFO TransportClientFactory: Successfully created connection to /9.47.30.46:51291 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:35:45 INFO BlockManager: Found block rdd_1005_98 remotely
19/09/17 09:35:47 INFO TransportClientFactory: Successfully created connection to /9.10.137.17:56554 after 2 ms (0 ms spent in bootstraps)
19/09/17 09:36:00 INFO BlockManager: Found block rdd_1005_116 remotely
19/09/17 09:36:02 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:58951 after 2 ms (0 ms spent in bootstraps)
19/09/17 09:36:16 INFO BlockManager: Found block rdd_1005_121 remotely
19/09/17 09:36:19 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:50992 after 1 ms (0 ms spent in bootstraps)
19/09/17 09:36:27 INFO BlockManager: Found block rdd_1005_128 remotely
19/09/17 09:36:39 INFO BlockManager: Found block rdd_1005_134 remotely
19/09/17 09:36:42 INFO TransportClientFactory: Successfully created connection to /9.10.7.92:41607 after 73 ms (0 ms spent in bootstraps)
19/09/17 09:36:54 INFO BlockManager: Found block rdd_1005_153 remotely
19/09/17 09:37:06 INFO BlockManager: Found block rdd_1005_167 remotely
19/09/17 09:37:08 INFO BlockManager: Found block rdd_1005_174 locally
19/09/17 09:37:08 INFO TransportClientFactory: Successfully created connection to /9.10.29.150:43709 after 10 ms (0 ms spent in bootstraps)
19/09/17 09:37:20 INFO BlockManager: Found block rdd_1005_182 remotely
19/09/17 09:37:22 INFO TransportClientFactory: Successfully created connection to /9.10.8.84:55958 after 14 ms (0 ms spent in bootstraps)
19/09/17 09:37:32 INFO BlockManager: Found block rdd_1005_189 remotely
19/09/17 09:37:34 INFO Executor: Finished task 3.0 in stage 3.0 (TID 211). 1752 bytes result sent to driver

 

When I{color:#de350b} increase the size of the Dataset<Row>{color}:
 * *Storage Level:* Disk Serialized 1x Replicated
 * *Cached Partitions:* 200
 * *Total Partitions:* 200
 * *Memory Size:* 390.8 GB
 * *Disk Size:* 166.8 GB
h4. Data Distribution on 100 Executors

{color:#de350b}*The Storage situation for the top 10* *cached the most memory* *executor:* {color}

on heap memory usage : 7.2 GB (2008.3 MB Remaining)

off heap memory usage : 0.0 B (0.0 B Remaining)
disk usage : 18.1 GB

{color:#de350b}*The Storage situation for the rest executors :*{color}

on heap memory usage : 3.6 GB (5.6 GB Remaining)

off heap memory usage : 0.0 B (0.0 B Remaining)

disk usage : 0.0 B

In this situation, 10 executors used disk usage because df.coalesce(10) throw ChunkFetchFailureException, 10 executors just fetch data from original datasource again, and cached in new Dataset<Row>.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org