You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "ZhanxiongWang (Jira)" <ji...@apache.org> on 2019/11/15 11:15:00 UTC

[jira] [Updated] (SPARK-29114) Dataset.coalesce(10) throw ChunkFetchFailureException when original Dataset partition size is big

     [ https://issues.apache.org/jira/browse/SPARK-29114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ZhanxiongWang updated SPARK-29114:
----------------------------------
    Description: 
Updated time:15/Nov/19
I discussed this issue with my colleagues today. We think that spark has caused cross-border problems in the process of doing shuffle.

The problem may be in the Sort-based Shuffle stage. When the map task partition is too large, and the storage of the Index variable uses int, Index may cause cross-border problems. If this is the case, the variable index {color:#de350b}replaces int with long{color} should solve the current problem.

--------------------------------------------------------

I create a Dataset<Row> df with 200 partitions. I applied for 100 executors for my task. Each executor with 1 core, and driver memory is 8G executor is 16G. I use df.cache() before df.coalesce(10). When{color:#de350b} Dataset<Row> partition{color} {color:#de350b}size is small{color}, the program works well. But when I {color:#de350b}increase{color} the size of the Dataset<Row> partition , the function {color:#de350b}df.coalesce(10){color} will throw ChunkFetchFailureException.

19/09/17 08:26:44 INFO CoarseGrainedExecutorBackend: Got assigned task 210
 19/09/17 08:26:44 INFO Executor: Running task 0.0 in stage 3.0 (TID 210)
 19/09/17 08:26:44 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
 19/09/17 08:26:44 INFO TorrentBroadcast: Started reading broadcast variable 1003
 19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 3.8 GB)
 19/09/17 08:26:44 INFO TorrentBroadcast: Reading broadcast variable 1003 took 7 ms
 19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 3.8 GB)
 19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_0 locally
 19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_1 locally
 19/09/17 08:26:44 INFO TransportClientFactory: Successfully created connection to /100.76.29.130:54238 after 1 ms (0 ms spent in bootstraps)
 19/09/17 08:26:46 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_18, and will not retry (0 retries)
 org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
 at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
 at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
 at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
 at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 at java.lang.Thread.run(Thread.java:745)
 19/09/17 08:26:46 WARN BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
 org.apache.spark.SparkException: Exception thrown in awaitResult: 
 at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
 at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115)
 at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:691)
 at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:634)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:747)
 at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:802)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
 at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
 at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
 at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
 at org.apache.spark.scheduler.Task.run(Task.scala:109)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
 at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
 at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
 at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
 at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 ... 1 more
 19/09/17 08:26:46 INFO NewHadoopRDD: Input split: 9.46.2.233:0935a68ad8000000,0935d7fb180999FF
 19/09/17 08:26:46 INFO TorrentBroadcast: Started reading broadcast variable 93
 19/09/17 08:26:46 INFO MemoryStore: Block broadcast_93_piece0 stored as bytes in memory (estimated size 32.5 KB, free 3.8 GB)
 19/09/17 08:26:46 INFO TorrentBroadcast: Reading broadcast variable 93 took 8 ms
 19/09/17 08:26:47 INFO MemoryStore: Block broadcast_93 stored as values in memory (estimated size 372.0 KB, free 3.8 GB)
 19/09/17 08:26:47 INFO RecoverableZooKeeper: Process identifier=hconnection-0x1aa852f0 connecting to ZooKeeper ensemble=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181
 19/09/17 08:26:47 INFO ZooKeeper: Initiating client connection, connectString=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 sessionTimeout=90000 watcher=hconnection-0x1aa852f0, quorum=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181, baseZNode=/hbase-qq-mp-ss-slave
 19/09/17 08:26:47 INFO ClientCnxn: Opening socket connection to server 10.254.82.84/10.254.82.84:2181. Will not attempt to authenticate using SASL (unknown error)
 19/09/17 08:26:47 INFO ClientCnxn: Socket connection established to 10.254.82.84/10.254.82.84:2181, initiating session
 19/09/17 08:26:47 INFO ClientCnxn: Session establishment complete on server 10.254.82.84/10.254.82.84:2181, sessionid = 0x36c7f371e67307f, negotiated timeout = 90000
 19/09/17 08:26:47 INFO TableInputFormatBase: Input split length: 19.8 G bytes.
 19/09/17 08:41:37 INFO HConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x36c7f371e67307f
 19/09/17 08:41:38 INFO ZooKeeper: Session: 0x36c7f371e67307f closed
 19/09/17 08:41:38 INFO ClientCnxn: EventThread shut down
 19/09/17 08:41:38 INFO MemoryStore: Block rdd_1005_18 stored as values in memory (estimated size 1822.9 MB, free 2025.1 MB)
 19/09/17 08:41:38 INFO TransportClientFactory: Successfully created connection to /9.10.29.145:37002 after 0 ms (0 ms spent in bootstraps)
 19/09/17 08:41:39 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_32, and will not retry (0 retries)
 org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=1800856515000, chunkIndex=0}: readerIndex: 0, writerIndex: -2138342822 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2138342822))

 

Let me explain more experimental details.

{color:#de350b}When each partition size is small:{color}
 * 
 * *Storage Level:* Memory Deserialized 1x Replicated
 * *Cached Partitions:* 200
 * *Total Partitions:* 200
 * *Memory Size:* 356.2 GB
 * *Disk Size:* 0.0 B
h4. Data Distribution on 100 Executors

 

The log of the successful task is as follows:

19/09/17 09:33:17 INFO CoarseGrainedExecutorBackend: Got assigned task 211
 19/09/17 09:33:17 INFO Executor: Running task 3.0 in stage 3.0 (TID 211)
 19/09/17 09:33:17 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
 19/09/17 09:33:17 INFO TorrentBroadcast: Started reading broadcast variable 1003
 19/09/17 09:33:17 INFO TransportClientFactory: Successfully created connection to /9.10.19.210:51072 after 8 ms (0 ms spent in bootstraps)
 19/09/17 09:33:17 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 5.5 GB)
 19/09/17 09:33:17 INFO TorrentBroadcast: Reading broadcast variable 1003 took 36 ms
 19/09/17 09:33:18 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 5.5 GB)
 19/09/17 09:33:18 INFO BlockManager: Found block rdd_1005_6 locally
 19/09/17 09:33:18 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:37220 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:33:36 INFO BlockManager: Found block rdd_1005_33 remotely
 19/09/17 09:33:37 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:32935 after 10 ms (0 ms spent in bootstraps)
 19/09/17 09:33:50 INFO BlockManager: Found block rdd_1005_40 remotely
 19/09/17 09:33:52 INFO TransportClientFactory: Successfully created connection to /100.76.25.87:45875 after 2 ms (0 ms spent in bootstraps)
 19/09/17 09:34:06 INFO BlockManager: Found block rdd_1005_46 remotely
 19/09/17 09:34:08 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:35134 after 32 ms (0 ms spent in bootstraps)
 19/09/17 09:34:18 INFO BlockManager: Found block rdd_1005_48 remotely
 19/09/17 09:34:20 INFO TransportClientFactory: Successfully created connection to /9.47.25.185:47504 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:34:42 INFO BlockManager: Found block rdd_1005_49 remotely
 19/09/17 09:34:44 INFO TransportClientFactory: Successfully created connection to /100.76.33.91:35365 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:34:59 INFO BlockManager: Found block rdd_1005_51 remotely
 19/09/17 09:35:01 INFO TransportClientFactory: Successfully created connection to /9.10.7.26:49383 after 3 ms (0 ms spent in bootstraps)
 19/09/17 09:35:16 INFO BlockManager: Found block rdd_1005_71 remotely
 19/09/17 09:35:18 INFO TransportClientFactory: Successfully created connection to /100.76.72.246:51684 after 2 ms (0 ms spent in bootstraps)
 19/09/17 09:35:28 INFO BlockManager: Found block rdd_1005_75 remotely
 19/09/17 09:35:30 INFO TransportClientFactory: Successfully created connection to /9.47.30.46:51291 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:35:45 INFO BlockManager: Found block rdd_1005_98 remotely
 19/09/17 09:35:47 INFO TransportClientFactory: Successfully created connection to /9.10.137.17:56554 after 2 ms (0 ms spent in bootstraps)
 19/09/17 09:36:00 INFO BlockManager: Found block rdd_1005_116 remotely
 19/09/17 09:36:02 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:58951 after 2 ms (0 ms spent in bootstraps)
 19/09/17 09:36:16 INFO BlockManager: Found block rdd_1005_121 remotely
 19/09/17 09:36:19 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:50992 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:36:27 INFO BlockManager: Found block rdd_1005_128 remotely
 19/09/17 09:36:39 INFO BlockManager: Found block rdd_1005_134 remotely
 19/09/17 09:36:42 INFO TransportClientFactory: Successfully created connection to /9.10.7.92:41607 after 73 ms (0 ms spent in bootstraps)
 19/09/17 09:36:54 INFO BlockManager: Found block rdd_1005_153 remotely
 19/09/17 09:37:06 INFO BlockManager: Found block rdd_1005_167 remotely
 19/09/17 09:37:08 INFO BlockManager: Found block rdd_1005_174 locally
 19/09/17 09:37:08 INFO TransportClientFactory: Successfully created connection to /9.10.29.150:43709 after 10 ms (0 ms spent in bootstraps)
 19/09/17 09:37:20 INFO BlockManager: Found block rdd_1005_182 remotely
 19/09/17 09:37:22 INFO TransportClientFactory: Successfully created connection to /9.10.8.84:55958 after 14 ms (0 ms spent in bootstraps)
 19/09/17 09:37:32 INFO BlockManager: Found block rdd_1005_189 remotely
 19/09/17 09:37:34 INFO Executor: Finished task 3.0 in stage 3.0 (TID 211). 1752 bytes result sent to driver

 

When I{color:#de350b} increase the size of each partition{color}:
 * *Storage Level:* Disk Serialized 1x Replicated
 * *Cached Partitions:* 200
 * *Total Partitions:* 200
 * *Memory Size:* 390.8 GB
 * *Disk Size:* 166.8 GB
h4. Data Distribution on 100 Executors

 

In this situation, 10 executors used disk usage because df.coalesce(10) throw ChunkFetchFailureException, 10 executors just fetch data from original datasource again, and cached in new Dataset<Row>.

  was:
I create a Dataset<Row> df with 200 partitions. I applied for 100 executors for my task. Each executor with 1 core, and driver memory is 8G executor is 16G. I use df.cache() before df.coalesce(10). When{color:#de350b} Dataset<Row> partition{color} {color:#de350b}size is small{color}, the program works well. But when I {color:#de350b}increase{color} the size of the Dataset<Row> partition , the function {color:#de350b}df.coalesce(10){color} will throw ChunkFetchFailureException.

19/09/17 08:26:44 INFO CoarseGrainedExecutorBackend: Got assigned task 210
 19/09/17 08:26:44 INFO Executor: Running task 0.0 in stage 3.0 (TID 210)
 19/09/17 08:26:44 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
 19/09/17 08:26:44 INFO TorrentBroadcast: Started reading broadcast variable 1003
 19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 3.8 GB)
 19/09/17 08:26:44 INFO TorrentBroadcast: Reading broadcast variable 1003 took 7 ms
 19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 3.8 GB)
 19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_0 locally
 19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_1 locally
 19/09/17 08:26:44 INFO TransportClientFactory: Successfully created connection to /100.76.29.130:54238 after 1 ms (0 ms spent in bootstraps)
 19/09/17 08:26:46 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_18, and will not retry (0 retries)
 org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
 at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
 at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
 at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
 at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 at java.lang.Thread.run(Thread.java:745)
 19/09/17 08:26:46 WARN BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
 org.apache.spark.SparkException: Exception thrown in awaitResult: 
 at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
 at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115)
 at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:691)
 at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:634)
 at org.apache.spark.storage.BlockManager.get(BlockManager.scala:747)
 at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:802)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
 at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
 at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
 at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
 at org.apache.spark.scheduler.Task.run(Task.scala:109)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
 at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
 at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
 at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
 at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
 at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
 at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 ... 1 more
 19/09/17 08:26:46 INFO NewHadoopRDD: Input split: 9.46.2.233:0935a68ad8000000,0935d7fb180999FF
 19/09/17 08:26:46 INFO TorrentBroadcast: Started reading broadcast variable 93
 19/09/17 08:26:46 INFO MemoryStore: Block broadcast_93_piece0 stored as bytes in memory (estimated size 32.5 KB, free 3.8 GB)
 19/09/17 08:26:46 INFO TorrentBroadcast: Reading broadcast variable 93 took 8 ms
 19/09/17 08:26:47 INFO MemoryStore: Block broadcast_93 stored as values in memory (estimated size 372.0 KB, free 3.8 GB)
 19/09/17 08:26:47 INFO RecoverableZooKeeper: Process identifier=hconnection-0x1aa852f0 connecting to ZooKeeper ensemble=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181
 19/09/17 08:26:47 INFO ZooKeeper: Initiating client connection, connectString=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 sessionTimeout=90000 watcher=hconnection-0x1aa852f0, quorum=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181, baseZNode=/hbase-qq-mp-ss-slave
 19/09/17 08:26:47 INFO ClientCnxn: Opening socket connection to server 10.254.82.84/10.254.82.84:2181. Will not attempt to authenticate using SASL (unknown error)
 19/09/17 08:26:47 INFO ClientCnxn: Socket connection established to 10.254.82.84/10.254.82.84:2181, initiating session
 19/09/17 08:26:47 INFO ClientCnxn: Session establishment complete on server 10.254.82.84/10.254.82.84:2181, sessionid = 0x36c7f371e67307f, negotiated timeout = 90000
 19/09/17 08:26:47 INFO TableInputFormatBase: Input split length: 19.8 G bytes.
 19/09/17 08:41:37 INFO HConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x36c7f371e67307f
 19/09/17 08:41:38 INFO ZooKeeper: Session: 0x36c7f371e67307f closed
 19/09/17 08:41:38 INFO ClientCnxn: EventThread shut down
 19/09/17 08:41:38 INFO MemoryStore: Block rdd_1005_18 stored as values in memory (estimated size 1822.9 MB, free 2025.1 MB)
 19/09/17 08:41:38 INFO TransportClientFactory: Successfully created connection to /9.10.29.145:37002 after 0 ms (0 ms spent in bootstraps)
 19/09/17 08:41:39 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_32, and will not retry (0 retries)
 org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=1800856515000, chunkIndex=0}: readerIndex: 0, writerIndex: -2138342822 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2138342822))

 

Let me explain more experimental details.

{color:#de350b}When each partition size is small:{color}
 * 
 * *Storage Level:* Memory Deserialized 1x Replicated
 * *Cached Partitions:* 200
 * *Total Partitions:* 200
 * *Memory Size:* 356.2 GB
 * *Disk Size:* 0.0 B
h4. Data Distribution on 100 Executors

 

The log of the successful task is as follows:

19/09/17 09:33:17 INFO CoarseGrainedExecutorBackend: Got assigned task 211
 19/09/17 09:33:17 INFO Executor: Running task 3.0 in stage 3.0 (TID 211)
 19/09/17 09:33:17 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
 19/09/17 09:33:17 INFO TorrentBroadcast: Started reading broadcast variable 1003
 19/09/17 09:33:17 INFO TransportClientFactory: Successfully created connection to /9.10.19.210:51072 after 8 ms (0 ms spent in bootstraps)
 19/09/17 09:33:17 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 5.5 GB)
 19/09/17 09:33:17 INFO TorrentBroadcast: Reading broadcast variable 1003 took 36 ms
 19/09/17 09:33:18 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 5.5 GB)
 19/09/17 09:33:18 INFO BlockManager: Found block rdd_1005_6 locally
 19/09/17 09:33:18 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:37220 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:33:36 INFO BlockManager: Found block rdd_1005_33 remotely
 19/09/17 09:33:37 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:32935 after 10 ms (0 ms spent in bootstraps)
 19/09/17 09:33:50 INFO BlockManager: Found block rdd_1005_40 remotely
 19/09/17 09:33:52 INFO TransportClientFactory: Successfully created connection to /100.76.25.87:45875 after 2 ms (0 ms spent in bootstraps)
 19/09/17 09:34:06 INFO BlockManager: Found block rdd_1005_46 remotely
 19/09/17 09:34:08 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:35134 after 32 ms (0 ms spent in bootstraps)
 19/09/17 09:34:18 INFO BlockManager: Found block rdd_1005_48 remotely
 19/09/17 09:34:20 INFO TransportClientFactory: Successfully created connection to /9.47.25.185:47504 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:34:42 INFO BlockManager: Found block rdd_1005_49 remotely
 19/09/17 09:34:44 INFO TransportClientFactory: Successfully created connection to /100.76.33.91:35365 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:34:59 INFO BlockManager: Found block rdd_1005_51 remotely
 19/09/17 09:35:01 INFO TransportClientFactory: Successfully created connection to /9.10.7.26:49383 after 3 ms (0 ms spent in bootstraps)
 19/09/17 09:35:16 INFO BlockManager: Found block rdd_1005_71 remotely
 19/09/17 09:35:18 INFO TransportClientFactory: Successfully created connection to /100.76.72.246:51684 after 2 ms (0 ms spent in bootstraps)
 19/09/17 09:35:28 INFO BlockManager: Found block rdd_1005_75 remotely
 19/09/17 09:35:30 INFO TransportClientFactory: Successfully created connection to /9.47.30.46:51291 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:35:45 INFO BlockManager: Found block rdd_1005_98 remotely
 19/09/17 09:35:47 INFO TransportClientFactory: Successfully created connection to /9.10.137.17:56554 after 2 ms (0 ms spent in bootstraps)
 19/09/17 09:36:00 INFO BlockManager: Found block rdd_1005_116 remotely
 19/09/17 09:36:02 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:58951 after 2 ms (0 ms spent in bootstraps)
 19/09/17 09:36:16 INFO BlockManager: Found block rdd_1005_121 remotely
 19/09/17 09:36:19 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:50992 after 1 ms (0 ms spent in bootstraps)
 19/09/17 09:36:27 INFO BlockManager: Found block rdd_1005_128 remotely
 19/09/17 09:36:39 INFO BlockManager: Found block rdd_1005_134 remotely
 19/09/17 09:36:42 INFO TransportClientFactory: Successfully created connection to /9.10.7.92:41607 after 73 ms (0 ms spent in bootstraps)
 19/09/17 09:36:54 INFO BlockManager: Found block rdd_1005_153 remotely
 19/09/17 09:37:06 INFO BlockManager: Found block rdd_1005_167 remotely
 19/09/17 09:37:08 INFO BlockManager: Found block rdd_1005_174 locally
 19/09/17 09:37:08 INFO TransportClientFactory: Successfully created connection to /9.10.29.150:43709 after 10 ms (0 ms spent in bootstraps)
 19/09/17 09:37:20 INFO BlockManager: Found block rdd_1005_182 remotely
 19/09/17 09:37:22 INFO TransportClientFactory: Successfully created connection to /9.10.8.84:55958 after 14 ms (0 ms spent in bootstraps)
 19/09/17 09:37:32 INFO BlockManager: Found block rdd_1005_189 remotely
 19/09/17 09:37:34 INFO Executor: Finished task 3.0 in stage 3.0 (TID 211). 1752 bytes result sent to driver

 

When I{color:#de350b} increase the size of each partition{color}:
 * *Storage Level:* Disk Serialized 1x Replicated
 * *Cached Partitions:* 200
 * *Total Partitions:* 200
 * *Memory Size:* 390.8 GB
 * *Disk Size:* 166.8 GB
h4. Data Distribution on 100 Executors

 

In this situation, 10 executors used disk usage because df.coalesce(10) throw ChunkFetchFailureException, 10 executors just fetch data from original datasource again, and cached in new Dataset<Row>.


> Dataset<Row>.coalesce(10) throw ChunkFetchFailureException when original Dataset partition size is big
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-29114
>                 URL: https://issues.apache.org/jira/browse/SPARK-29114
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager
>    Affects Versions: 2.3.0
>            Reporter: ZhanxiongWang
>            Priority: Major
>
> Updated time:15/Nov/19
> I discussed this issue with my colleagues today. We think that spark has caused cross-border problems in the process of doing shuffle.
> The problem may be in the Sort-based Shuffle stage. When the map task partition is too large, and the storage of the Index variable uses int, Index may cause cross-border problems. If this is the case, the variable index {color:#de350b}replaces int with long{color} should solve the current problem.
> --------------------------------------------------------
> I create a Dataset<Row> df with 200 partitions. I applied for 100 executors for my task. Each executor with 1 core, and driver memory is 8G executor is 16G. I use df.cache() before df.coalesce(10). When{color:#de350b} Dataset<Row> partition{color} {color:#de350b}size is small{color}, the program works well. But when I {color:#de350b}increase{color} the size of the Dataset<Row> partition , the function {color:#de350b}df.coalesce(10){color} will throw ChunkFetchFailureException.
> 19/09/17 08:26:44 INFO CoarseGrainedExecutorBackend: Got assigned task 210
>  19/09/17 08:26:44 INFO Executor: Running task 0.0 in stage 3.0 (TID 210)
>  19/09/17 08:26:44 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
>  19/09/17 08:26:44 INFO TorrentBroadcast: Started reading broadcast variable 1003
>  19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 3.8 GB)
>  19/09/17 08:26:44 INFO TorrentBroadcast: Reading broadcast variable 1003 took 7 ms
>  19/09/17 08:26:44 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 3.8 GB)
>  19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_0 locally
>  19/09/17 08:26:44 INFO BlockManager: Found block rdd_1005_1 locally
>  19/09/17 08:26:44 INFO TransportClientFactory: Successfully created connection to /100.76.29.130:54238 after 1 ms (0 ms spent in bootstraps)
>  19/09/17 08:26:46 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_18, and will not retry (0 retries)
>  org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
>  at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
>  at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
>  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>  at java.lang.Thread.run(Thread.java:745)
>  19/09/17 08:26:46 WARN BlockManager: Failed to fetch block after 1 fetch failures. Most recent failure cause:
>  org.apache.spark.SparkException: Exception thrown in awaitResult: 
>  at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
>  at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:115)
>  at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:691)
>  at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:634)
>  at org.apache.spark.storage.BlockManager.get(BlockManager.scala:747)
>  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:802)
>  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
>  at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
>  at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>  at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
>  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
>  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>  Caused by: org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=69368607002, chunkIndex=0}: readerIndex: 0, writerIndex: -2137154997 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2137154997))
>  at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:182)
>  at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399)
>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371)
>  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
>  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>  ... 1 more
>  19/09/17 08:26:46 INFO NewHadoopRDD: Input split: 9.46.2.233:0935a68ad8000000,0935d7fb180999FF
>  19/09/17 08:26:46 INFO TorrentBroadcast: Started reading broadcast variable 93
>  19/09/17 08:26:46 INFO MemoryStore: Block broadcast_93_piece0 stored as bytes in memory (estimated size 32.5 KB, free 3.8 GB)
>  19/09/17 08:26:46 INFO TorrentBroadcast: Reading broadcast variable 93 took 8 ms
>  19/09/17 08:26:47 INFO MemoryStore: Block broadcast_93 stored as values in memory (estimated size 372.0 KB, free 3.8 GB)
>  19/09/17 08:26:47 INFO RecoverableZooKeeper: Process identifier=hconnection-0x1aa852f0 connecting to ZooKeeper ensemble=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181
>  19/09/17 08:26:47 INFO ZooKeeper: Initiating client connection, connectString=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181 sessionTimeout=90000 watcher=hconnection-0x1aa852f0, quorum=ss-hbase-zk-4:2181,ss-hbase-zk-3:2181,ss-hbase-zk-2:2181,ss-hbase-zk-1:2181,ss-hbase-zk-5:2181, baseZNode=/hbase-qq-mp-ss-slave
>  19/09/17 08:26:47 INFO ClientCnxn: Opening socket connection to server 10.254.82.84/10.254.82.84:2181. Will not attempt to authenticate using SASL (unknown error)
>  19/09/17 08:26:47 INFO ClientCnxn: Socket connection established to 10.254.82.84/10.254.82.84:2181, initiating session
>  19/09/17 08:26:47 INFO ClientCnxn: Session establishment complete on server 10.254.82.84/10.254.82.84:2181, sessionid = 0x36c7f371e67307f, negotiated timeout = 90000
>  19/09/17 08:26:47 INFO TableInputFormatBase: Input split length: 19.8 G bytes.
>  19/09/17 08:41:37 INFO HConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x36c7f371e67307f
>  19/09/17 08:41:38 INFO ZooKeeper: Session: 0x36c7f371e67307f closed
>  19/09/17 08:41:38 INFO ClientCnxn: EventThread shut down
>  19/09/17 08:41:38 INFO MemoryStore: Block rdd_1005_18 stored as values in memory (estimated size 1822.9 MB, free 2025.1 MB)
>  19/09/17 08:41:38 INFO TransportClientFactory: Successfully created connection to /9.10.29.145:37002 after 0 ms (0 ms spent in bootstraps)
>  19/09/17 08:41:39 ERROR RetryingBlockFetcher: Failed to fetch block rdd_1005_32, and will not retry (0 retries)
>  org.apache.spark.network.client.ChunkFetchFailureException: Failure while fetching StreamChunkId\{streamId=1800856515000, chunkIndex=0}: readerIndex: 0, writerIndex: -2138342822 (expected: 0 <= readerIndex <= writerIndex <= capacity(-2138342822))
>  
> Let me explain more experimental details.
> {color:#de350b}When each partition size is small:{color}
>  * 
>  * *Storage Level:* Memory Deserialized 1x Replicated
>  * *Cached Partitions:* 200
>  * *Total Partitions:* 200
>  * *Memory Size:* 356.2 GB
>  * *Disk Size:* 0.0 B
> h4. Data Distribution on 100 Executors
>  
> The log of the successful task is as follows:
> 19/09/17 09:33:17 INFO CoarseGrainedExecutorBackend: Got assigned task 211
>  19/09/17 09:33:17 INFO Executor: Running task 3.0 in stage 3.0 (TID 211)
>  19/09/17 09:33:17 INFO MapOutputTrackerWorker: Updating epoch to 1 and clearing cache
>  19/09/17 09:33:17 INFO TorrentBroadcast: Started reading broadcast variable 1003
>  19/09/17 09:33:17 INFO TransportClientFactory: Successfully created connection to /9.10.19.210:51072 after 8 ms (0 ms spent in bootstraps)
>  19/09/17 09:33:17 INFO MemoryStore: Block broadcast_1003_piece0 stored as bytes in memory (estimated size 49.4 KB, free 5.5 GB)
>  19/09/17 09:33:17 INFO TorrentBroadcast: Reading broadcast variable 1003 took 36 ms
>  19/09/17 09:33:18 INFO MemoryStore: Block broadcast_1003 stored as values in memory (estimated size 154.5 KB, free 5.5 GB)
>  19/09/17 09:33:18 INFO BlockManager: Found block rdd_1005_6 locally
>  19/09/17 09:33:18 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:37220 after 1 ms (0 ms spent in bootstraps)
>  19/09/17 09:33:36 INFO BlockManager: Found block rdd_1005_33 remotely
>  19/09/17 09:33:37 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:32935 after 10 ms (0 ms spent in bootstraps)
>  19/09/17 09:33:50 INFO BlockManager: Found block rdd_1005_40 remotely
>  19/09/17 09:33:52 INFO TransportClientFactory: Successfully created connection to /100.76.25.87:45875 after 2 ms (0 ms spent in bootstraps)
>  19/09/17 09:34:06 INFO BlockManager: Found block rdd_1005_46 remotely
>  19/09/17 09:34:08 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:35134 after 32 ms (0 ms spent in bootstraps)
>  19/09/17 09:34:18 INFO BlockManager: Found block rdd_1005_48 remotely
>  19/09/17 09:34:20 INFO TransportClientFactory: Successfully created connection to /9.47.25.185:47504 after 1 ms (0 ms spent in bootstraps)
>  19/09/17 09:34:42 INFO BlockManager: Found block rdd_1005_49 remotely
>  19/09/17 09:34:44 INFO TransportClientFactory: Successfully created connection to /100.76.33.91:35365 after 1 ms (0 ms spent in bootstraps)
>  19/09/17 09:34:59 INFO BlockManager: Found block rdd_1005_51 remotely
>  19/09/17 09:35:01 INFO TransportClientFactory: Successfully created connection to /9.10.7.26:49383 after 3 ms (0 ms spent in bootstraps)
>  19/09/17 09:35:16 INFO BlockManager: Found block rdd_1005_71 remotely
>  19/09/17 09:35:18 INFO TransportClientFactory: Successfully created connection to /100.76.72.246:51684 after 2 ms (0 ms spent in bootstraps)
>  19/09/17 09:35:28 INFO BlockManager: Found block rdd_1005_75 remotely
>  19/09/17 09:35:30 INFO TransportClientFactory: Successfully created connection to /9.47.30.46:51291 after 1 ms (0 ms spent in bootstraps)
>  19/09/17 09:35:45 INFO BlockManager: Found block rdd_1005_98 remotely
>  19/09/17 09:35:47 INFO TransportClientFactory: Successfully created connection to /9.10.137.17:56554 after 2 ms (0 ms spent in bootstraps)
>  19/09/17 09:36:00 INFO BlockManager: Found block rdd_1005_116 remotely
>  19/09/17 09:36:02 INFO TransportClientFactory: Successfully created connection to /100.76.35.94:58951 after 2 ms (0 ms spent in bootstraps)
>  19/09/17 09:36:16 INFO BlockManager: Found block rdd_1005_121 remotely
>  19/09/17 09:36:19 INFO TransportClientFactory: Successfully created connection to /9.10.36.96:50992 after 1 ms (0 ms spent in bootstraps)
>  19/09/17 09:36:27 INFO BlockManager: Found block rdd_1005_128 remotely
>  19/09/17 09:36:39 INFO BlockManager: Found block rdd_1005_134 remotely
>  19/09/17 09:36:42 INFO TransportClientFactory: Successfully created connection to /9.10.7.92:41607 after 73 ms (0 ms spent in bootstraps)
>  19/09/17 09:36:54 INFO BlockManager: Found block rdd_1005_153 remotely
>  19/09/17 09:37:06 INFO BlockManager: Found block rdd_1005_167 remotely
>  19/09/17 09:37:08 INFO BlockManager: Found block rdd_1005_174 locally
>  19/09/17 09:37:08 INFO TransportClientFactory: Successfully created connection to /9.10.29.150:43709 after 10 ms (0 ms spent in bootstraps)
>  19/09/17 09:37:20 INFO BlockManager: Found block rdd_1005_182 remotely
>  19/09/17 09:37:22 INFO TransportClientFactory: Successfully created connection to /9.10.8.84:55958 after 14 ms (0 ms spent in bootstraps)
>  19/09/17 09:37:32 INFO BlockManager: Found block rdd_1005_189 remotely
>  19/09/17 09:37:34 INFO Executor: Finished task 3.0 in stage 3.0 (TID 211). 1752 bytes result sent to driver
>  
> When I{color:#de350b} increase the size of each partition{color}:
>  * *Storage Level:* Disk Serialized 1x Replicated
>  * *Cached Partitions:* 200
>  * *Total Partitions:* 200
>  * *Memory Size:* 390.8 GB
>  * *Disk Size:* 166.8 GB
> h4. Data Distribution on 100 Executors
>  
> In this situation, 10 executors used disk usage because df.coalesce(10) throw ChunkFetchFailureException, 10 executors just fetch data from original datasource again, and cached in new Dataset<Row>.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org