You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by lonely Feb <lo...@gmail.com> on 2015/03/16 04:08:14 UTC
broadcast hang out
Hi all, i meet up with a problem that torrent broadcast hang out in my
spark cluster (1.2, standalone) , particularly serious when driver and
executors are cross-region. when i read the code of broadcast i found that
a sync block read here:
def fetchBlockSync(host: String, port: Int, execId: String, blockId:
String): ManagedBuffer = {
// A monitor for the thread to wait on.
val result = Promise[ManagedBuffer]()
fetchBlocks(host, port, execId, Array(blockId),
new BlockFetchingListener {
override def onBlockFetchFailure(blockId: String, exception:
Throwable): Unit = {
result.failure(exception)
}
override def onBlockFetchSuccess(blockId: String, data:
ManagedBuffer): Unit = {
val ret = ByteBuffer.allocate(data.size.toInt)
ret.put(data.nioByteBuffer())
ret.flip()
result.success(new NioManagedBuffer(ret))
}
})
Await.result(result.future, Duration.Inf)
}
it seems that fetchBlockSync method does not have a timeout limit but wait
forever ? Anybody can show me how to control the timeout here?
Re: broadcast hang out
Posted by lonely Feb <lo...@gmail.com>.
Thx. But this method is in BlockTransferService.scala of spark which i can
not replace unless i rewrite the core code. I wonder if it is handled
somewhere already.
2015-03-16 11:27 GMT+08:00 Chester Chen <ch...@alpinenow.com>:
> can you just replace "Duration.Inf" with a shorter duration ? how about
>
> import scala.concurrent.duration._
> val timeout = new Timeout(10 seconds)
> Await.result(result.future, timeout.duration)
>
> or
>
> val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
> Await.result(result.future, timeout)
>
> or simply
> import scala.concurrent.duration._
> Await.result(result.future, 10 seconds)
>
>
>
> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
>
>> Hi all, i meet up with a problem that torrent broadcast hang out in my
>> spark cluster (1.2, standalone) , particularly serious when driver and
>> executors are cross-region. when i read the code of broadcast i found that
>> a sync block read here:
>>
>> def fetchBlockSync(host: String, port: Int, execId: String, blockId:
>> String): ManagedBuffer = {
>> // A monitor for the thread to wait on.
>> val result = Promise[ManagedBuffer]()
>> fetchBlocks(host, port, execId, Array(blockId),
>> new BlockFetchingListener {
>> override def onBlockFetchFailure(blockId: String, exception:
>> Throwable): Unit = {
>> result.failure(exception)
>> }
>> override def onBlockFetchSuccess(blockId: String, data:
>> ManagedBuffer): Unit = {
>> val ret = ByteBuffer.allocate(data.size.toInt)
>> ret.put(data.nioByteBuffer())
>> ret.flip()
>> result.success(new NioManagedBuffer(ret))
>> }
>> })
>>
>> Await.result(result.future, Duration.Inf)
>> }
>>
>> it seems that fetchBlockSync method does not have a timeout limit but wait
>> forever ? Anybody can show me how to control the timeout here?
>>
>
>
Re: broadcast hang out
Posted by Chester Chen <ch...@alpinenow.com>.
can you just replace "Duration.Inf" with a shorter duration ? how about
import scala.concurrent.duration._
val timeout = new Timeout(10 seconds)
Await.result(result.future, timeout.duration)
or
val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
Await.result(result.future, timeout)
or simply
import scala.concurrent.duration._
Await.result(result.future, 10 seconds)
On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
> Hi all, i meet up with a problem that torrent broadcast hang out in my
> spark cluster (1.2, standalone) , particularly serious when driver and
> executors are cross-region. when i read the code of broadcast i found that
> a sync block read here:
>
> def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> String): ManagedBuffer = {
> // A monitor for the thread to wait on.
> val result = Promise[ManagedBuffer]()
> fetchBlocks(host, port, execId, Array(blockId),
> new BlockFetchingListener {
> override def onBlockFetchFailure(blockId: String, exception:
> Throwable): Unit = {
> result.failure(exception)
> }
> override def onBlockFetchSuccess(blockId: String, data:
> ManagedBuffer): Unit = {
> val ret = ByteBuffer.allocate(data.size.toInt)
> ret.put(data.nioByteBuffer())
> ret.flip()
> result.success(new NioManagedBuffer(ret))
> }
> })
>
> Await.result(result.future, Duration.Inf)
> }
>
> it seems that fetchBlockSync method does not have a timeout limit but wait
> forever ? Anybody can show me how to control the timeout here?
>
Re: broadcast hang out
Posted by Reynold Xin <rx...@databricks.com>.
It would be great to add a timeout. Do you mind submitting a pull request?
On Sun, Mar 15, 2015 at 10:41 PM, lonely Feb <lo...@gmail.com> wrote:
> Anyone can help? Thanks a lot !
>
> 2015-03-16 11:45 GMT+08:00 lonely Feb <lo...@gmail.com>:
>
> > yes
> >
> > 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan <mr...@gmail.com>:
> >
> >> Cross region as in different data centers ?
> >>
> >> - Mridul
> >>
> >> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com>
> wrote:
> >> > Hi all, i meet up with a problem that torrent broadcast hang out in my
> >> > spark cluster (1.2, standalone) , particularly serious when driver and
> >> > executors are cross-region. when i read the code of broadcast i found
> >> that
> >> > a sync block read here:
> >> >
> >> > def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> >> > String): ManagedBuffer = {
> >> > // A monitor for the thread to wait on.
> >> > val result = Promise[ManagedBuffer]()
> >> > fetchBlocks(host, port, execId, Array(blockId),
> >> > new BlockFetchingListener {
> >> > override def onBlockFetchFailure(blockId: String, exception:
> >> > Throwable): Unit = {
> >> > result.failure(exception)
> >> > }
> >> > override def onBlockFetchSuccess(blockId: String, data:
> >> > ManagedBuffer): Unit = {
> >> > val ret = ByteBuffer.allocate(data.size.toInt)
> >> > ret.put(data.nioByteBuffer())
> >> > ret.flip()
> >> > result.success(new NioManagedBuffer(ret))
> >> > }
> >> > })
> >> >
> >> > Await.result(result.future, Duration.Inf)
> >> > }
> >> >
> >> > it seems that fetchBlockSync method does not have a timeout limit but
> >> wait
> >> > forever ? Anybody can show me how to control the timeout here?
> >>
> >
> >
>
Re: broadcast hang out
Posted by lonely Feb <lo...@gmail.com>.
Anyone can help? Thanks a lot !
2015-03-16 11:45 GMT+08:00 lonely Feb <lo...@gmail.com>:
> yes
>
> 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan <mr...@gmail.com>:
>
>> Cross region as in different data centers ?
>>
>> - Mridul
>>
>> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
>> > Hi all, i meet up with a problem that torrent broadcast hang out in my
>> > spark cluster (1.2, standalone) , particularly serious when driver and
>> > executors are cross-region. when i read the code of broadcast i found
>> that
>> > a sync block read here:
>> >
>> > def fetchBlockSync(host: String, port: Int, execId: String, blockId:
>> > String): ManagedBuffer = {
>> > // A monitor for the thread to wait on.
>> > val result = Promise[ManagedBuffer]()
>> > fetchBlocks(host, port, execId, Array(blockId),
>> > new BlockFetchingListener {
>> > override def onBlockFetchFailure(blockId: String, exception:
>> > Throwable): Unit = {
>> > result.failure(exception)
>> > }
>> > override def onBlockFetchSuccess(blockId: String, data:
>> > ManagedBuffer): Unit = {
>> > val ret = ByteBuffer.allocate(data.size.toInt)
>> > ret.put(data.nioByteBuffer())
>> > ret.flip()
>> > result.success(new NioManagedBuffer(ret))
>> > }
>> > })
>> >
>> > Await.result(result.future, Duration.Inf)
>> > }
>> >
>> > it seems that fetchBlockSync method does not have a timeout limit but
>> wait
>> > forever ? Anybody can show me how to control the timeout here?
>>
>
>
Re: broadcast hang out
Posted by lonely Feb <lo...@gmail.com>.
yes
2015-03-16 11:43 GMT+08:00 Mridul Muralidharan <mr...@gmail.com>:
> Cross region as in different data centers ?
>
> - Mridul
>
> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
> > Hi all, i meet up with a problem that torrent broadcast hang out in my
> > spark cluster (1.2, standalone) , particularly serious when driver and
> > executors are cross-region. when i read the code of broadcast i found
> that
> > a sync block read here:
> >
> > def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> > String): ManagedBuffer = {
> > // A monitor for the thread to wait on.
> > val result = Promise[ManagedBuffer]()
> > fetchBlocks(host, port, execId, Array(blockId),
> > new BlockFetchingListener {
> > override def onBlockFetchFailure(blockId: String, exception:
> > Throwable): Unit = {
> > result.failure(exception)
> > }
> > override def onBlockFetchSuccess(blockId: String, data:
> > ManagedBuffer): Unit = {
> > val ret = ByteBuffer.allocate(data.size.toInt)
> > ret.put(data.nioByteBuffer())
> > ret.flip()
> > result.success(new NioManagedBuffer(ret))
> > }
> > })
> >
> > Await.result(result.future, Duration.Inf)
> > }
> >
> > it seems that fetchBlockSync method does not have a timeout limit but
> wait
> > forever ? Anybody can show me how to control the timeout here?
>
Re: broadcast hang out
Posted by Mridul Muralidharan <mr...@gmail.com>.
Cross region as in different data centers ?
- Mridul
On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
> Hi all, i meet up with a problem that torrent broadcast hang out in my
> spark cluster (1.2, standalone) , particularly serious when driver and
> executors are cross-region. when i read the code of broadcast i found that
> a sync block read here:
>
> def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> String): ManagedBuffer = {
> // A monitor for the thread to wait on.
> val result = Promise[ManagedBuffer]()
> fetchBlocks(host, port, execId, Array(blockId),
> new BlockFetchingListener {
> override def onBlockFetchFailure(blockId: String, exception:
> Throwable): Unit = {
> result.failure(exception)
> }
> override def onBlockFetchSuccess(blockId: String, data:
> ManagedBuffer): Unit = {
> val ret = ByteBuffer.allocate(data.size.toInt)
> ret.put(data.nioByteBuffer())
> ret.flip()
> result.success(new NioManagedBuffer(ret))
> }
> })
>
> Await.result(result.future, Duration.Inf)
> }
>
> it seems that fetchBlockSync method does not have a timeout limit but wait
> forever ? Anybody can show me how to control the timeout here?
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org