You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/01/16 13:41:39 UTC

[jira] [Reopened] (SPARK-7703) Task failure caused by block fetch failure in BlockManager.doGetRemote() when using TorrentBroadcast

     [ https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen reopened SPARK-7703:
------------------------------

> Task failure caused by block fetch failure in BlockManager.doGetRemote() when using TorrentBroadcast
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-7703
>                 URL: https://issues.apache.org/jira/browse/SPARK-7703
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.2.1, 1.3.1
>         Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
>            Reporter: Hailong Wen
>
> I am from IBM Platform Symphony team and we are working to integration Spark with our EGO to provide a fine-grained dynamic allocation Resource Manager. 
> We found a defect in current implementation of BlockManager.doGetRemote():
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
>     require(blockId != null, "BlockId is null")
>     val locations = Random.shuffle(master.getLocations(blockId))     <--------------- Issue2: locations may be out of date
>     for (loc <- locations) {
>       logDebug(s"Getting remote block $blockId from $loc")
>       val data = blockTransferService.fetchBlockSync(
>         loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()      <--------------- Issue1: This statement is not in try/catch
>       if (data != null) {
>         if (asBlockResult) {
>           return Some(new BlockResult(
>             dataDeserialize(blockId, data),
>             DataReadMethod.Network,
>             data.limit()))
>         } else {
>           return Some(data)
>         }
>       }
>       logDebug(s"The value of block $blockId is null")
>     }
>     logDebug(s"Block $blockId not found")
>     None
>   }
> {noformat}
> * Issue 1: Although the block fetch uses "for" to try all available locations, the fetch method is not guarded by a "Try" block. When exception occurs, this method will directly throw the error instead of trying other block locations. The uncaught exception will cause task failure.
> * Issue 2: Constant "location" is acquired before fetching, however in a dynamic allocation environment the block locations may change.
> We hit the above 2 issues in our use case, where Executors exit after all its assigned tasks are done. We *occasionally* get the following error (issue 1.):
> {noformat}
> 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
> 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
> 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
> 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0 as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered locally
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0 from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
> 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17)
> 15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks 
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> 	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> 	at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
> 	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:599)
> 	at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:597)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:597)
> 	at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:591)
> 	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126)
> 	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
> 	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
> 	at scala.Option.orElse(Option.scala:257)
> 	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
> 	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> 	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> 	at scala.collection.immutable.List.foreach(List.scala:318)
> 	at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
> 	at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
> 	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1149)
> 	at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
> 	at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> 	at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> 	at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
> 	at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.net.ConnectException: Connection refused: sparkbj01/9.111.254.195:37599
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> 	... 1 more
> 15/05/13 10:28:35 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
> 15/05/13 10:28:40 DEBUG TransportClientFactory: Creating new connection to sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:40 ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
> 	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> 	at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> 	at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> 	at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> 	at java.lang.Thread.run(Thread.java:662)
> Caused by: java.net.ConnectException: Connection refused: sparkbj01/9.111.254.195:37599
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
> 	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> 	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> 	... 1 more
> {noformat}
> We did send "ExecutorLost" messages so that BlockManagerMaster can remove the executor from its block location map. But due to network latency the "getLocation" call may happen before the removal. 
> In our heavy workload environment, some tasks may keep fail and finally causes *job failure*.
> Using HttpBroadcast instead of default TorrentBroadcast did help to resolve this problem but we want better performance. So we added a Try block but found that the "for" loop will try dozens of dead executor before finally fetched the block from driver's BlockManager. This process takes *several minutes*.
> We are now working around this problem by the following fix:
> {noformat}
>   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
>     require(blockId != null, "BlockId is null")
>     var blockFetched = false
>     while (!blockFetched) {
>       val locations = Random.shuffle(master.getLocations(blockId))
>       val loc = locations.head
>       logDebug(s"Getting remote block $blockId from $loc")
>       val dataTry = Try(blockTransferService.fetchBlockSync(
>         loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer())
>       
>       dataTry match {
>         case Success(data) =>
>           if (data != null) {
>             if (asBlockResult) {
>               return Some(new BlockResult(
>                 dataDeserialize(blockId, data),
>                 DataReadMethod.Network,
>                 data.limit()))
>             } else {
>               return Some(data)
>             }
>           }
>           logDebug(s"The value of block $blockId is null")
>         case Failure(e) =>
>           logWarning(s"Failed to fetch block ${blockId.toString} from ${loc.host}:"
>                      + s"${loc.port} executorId:${loc.executorId}. "
>                      + {
>                        if (locations.size <= 1) "" else "Will update locations and retry."
>                      })
>       }
>       // If we have no more than 1 location to get from (the driver), we may stop retrying and just exit.
>       blockFetched = (locations.size <= 1)
>     }
>     logDebug(s"Block $blockId not found")
>     None
>   }
> {noformat}
> This fix suppress the Exception when fetch fails, and update the location to reduce future failures.
> We are expecting to get help from experts in the community to have a more thorough solution (e.g., can we try all available block locations in a random rolling manner, instead of re-trying the same location 4 times consecutively?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org