You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by lonely Feb <lo...@gmail.com> on 2015/03/16 04:08:14 UTC

broadcast hang out

Hi all, i meet up with a problem that torrent broadcast hang out in my
spark cluster (1.2, standalone) , particularly serious when driver and
executors are cross-region. when i read the code of broadcast i found that
a sync block read here:

  def fetchBlockSync(host: String, port: Int, execId: String, blockId:
String): ManagedBuffer = {
    // A monitor for the thread to wait on.
    val result = Promise[ManagedBuffer]()
    fetchBlocks(host, port, execId, Array(blockId),
      new BlockFetchingListener {
        override def onBlockFetchFailure(blockId: String, exception:
Throwable): Unit = {
          result.failure(exception)
        }
        override def onBlockFetchSuccess(blockId: String, data:
ManagedBuffer): Unit = {
          val ret = ByteBuffer.allocate(data.size.toInt)
          ret.put(data.nioByteBuffer())
          ret.flip()
          result.success(new NioManagedBuffer(ret))
        }
      })

    Await.result(result.future, Duration.Inf)
  }

it seems that fetchBlockSync method does not have a timeout limit but wait
forever ? Anybody can show me how to control the timeout here?

Re: broadcast hang out

Posted by lonely Feb <lo...@gmail.com>.
Thx. But this method is in BlockTransferService.scala of spark which i can
not replace unless i rewrite the core code. I wonder if it is handled
somewhere already.

2015-03-16 11:27 GMT+08:00 Chester Chen <ch...@alpinenow.com>:

> can you just replace "Duration.Inf" with a shorter duration  ? how about
>
>       import scala.concurrent.duration._
>       val timeout = new Timeout(10 seconds)
>       Await.result(result.future, timeout.duration)
>
>       or
>
>       val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
>       Await.result(result.future, timeout)
>
>       or simply
>       import scala.concurrent.duration._
>       Await.result(result.future, 10 seconds)
>
>
>
> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
>
>> Hi all, i meet up with a problem that torrent broadcast hang out in my
>> spark cluster (1.2, standalone) , particularly serious when driver and
>> executors are cross-region. when i read the code of broadcast i found that
>> a sync block read here:
>>
>>   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
>> String): ManagedBuffer = {
>>     // A monitor for the thread to wait on.
>>     val result = Promise[ManagedBuffer]()
>>     fetchBlocks(host, port, execId, Array(blockId),
>>       new BlockFetchingListener {
>>         override def onBlockFetchFailure(blockId: String, exception:
>> Throwable): Unit = {
>>           result.failure(exception)
>>         }
>>         override def onBlockFetchSuccess(blockId: String, data:
>> ManagedBuffer): Unit = {
>>           val ret = ByteBuffer.allocate(data.size.toInt)
>>           ret.put(data.nioByteBuffer())
>>           ret.flip()
>>           result.success(new NioManagedBuffer(ret))
>>         }
>>       })
>>
>>     Await.result(result.future, Duration.Inf)
>>   }
>>
>> it seems that fetchBlockSync method does not have a timeout limit but wait
>> forever ? Anybody can show me how to control the timeout here?
>>
>
>

Re: broadcast hang out

Posted by Chester Chen <ch...@alpinenow.com>.
can you just replace "Duration.Inf" with a shorter duration  ? how about

      import scala.concurrent.duration._
      val timeout = new Timeout(10 seconds)
      Await.result(result.future, timeout.duration)

      or

      val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
      Await.result(result.future, timeout)

      or simply
      import scala.concurrent.duration._
      Await.result(result.future, 10 seconds)



On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:

> Hi all, i meet up with a problem that torrent broadcast hang out in my
> spark cluster (1.2, standalone) , particularly serious when driver and
> executors are cross-region. when i read the code of broadcast i found that
> a sync block read here:
>
>   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> String): ManagedBuffer = {
>     // A monitor for the thread to wait on.
>     val result = Promise[ManagedBuffer]()
>     fetchBlocks(host, port, execId, Array(blockId),
>       new BlockFetchingListener {
>         override def onBlockFetchFailure(blockId: String, exception:
> Throwable): Unit = {
>           result.failure(exception)
>         }
>         override def onBlockFetchSuccess(blockId: String, data:
> ManagedBuffer): Unit = {
>           val ret = ByteBuffer.allocate(data.size.toInt)
>           ret.put(data.nioByteBuffer())
>           ret.flip()
>           result.success(new NioManagedBuffer(ret))
>         }
>       })
>
>     Await.result(result.future, Duration.Inf)
>   }
>
> it seems that fetchBlockSync method does not have a timeout limit but wait
> forever ? Anybody can show me how to control the timeout here?
>

Re: broadcast hang out

Posted by Reynold Xin <rx...@databricks.com>.
It would be great to add a timeout. Do you mind submitting a pull request?


On Sun, Mar 15, 2015 at 10:41 PM, lonely Feb <lo...@gmail.com> wrote:

> Anyone can help? Thanks a lot !
>
> 2015-03-16 11:45 GMT+08:00 lonely Feb <lo...@gmail.com>:
>
> > yes
> >
> > 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan <mr...@gmail.com>:
> >
> >> Cross region as in different data centers ?
> >>
> >> - Mridul
> >>
> >> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com>
> wrote:
> >> > Hi all, i meet up with a problem that torrent broadcast hang out in my
> >> > spark cluster (1.2, standalone) , particularly serious when driver and
> >> > executors are cross-region. when i read the code of broadcast i found
> >> that
> >> > a sync block read here:
> >> >
> >> >   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> >> > String): ManagedBuffer = {
> >> >     // A monitor for the thread to wait on.
> >> >     val result = Promise[ManagedBuffer]()
> >> >     fetchBlocks(host, port, execId, Array(blockId),
> >> >       new BlockFetchingListener {
> >> >         override def onBlockFetchFailure(blockId: String, exception:
> >> > Throwable): Unit = {
> >> >           result.failure(exception)
> >> >         }
> >> >         override def onBlockFetchSuccess(blockId: String, data:
> >> > ManagedBuffer): Unit = {
> >> >           val ret = ByteBuffer.allocate(data.size.toInt)
> >> >           ret.put(data.nioByteBuffer())
> >> >           ret.flip()
> >> >           result.success(new NioManagedBuffer(ret))
> >> >         }
> >> >       })
> >> >
> >> >     Await.result(result.future, Duration.Inf)
> >> >   }
> >> >
> >> > it seems that fetchBlockSync method does not have a timeout limit but
> >> wait
> >> > forever ? Anybody can show me how to control the timeout here?
> >>
> >
> >
>

Re: broadcast hang out

Posted by lonely Feb <lo...@gmail.com>.
Anyone can help? Thanks a lot !

2015-03-16 11:45 GMT+08:00 lonely Feb <lo...@gmail.com>:

> yes
>
> 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan <mr...@gmail.com>:
>
>> Cross region as in different data centers ?
>>
>> - Mridul
>>
>> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
>> > Hi all, i meet up with a problem that torrent broadcast hang out in my
>> > spark cluster (1.2, standalone) , particularly serious when driver and
>> > executors are cross-region. when i read the code of broadcast i found
>> that
>> > a sync block read here:
>> >
>> >   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
>> > String): ManagedBuffer = {
>> >     // A monitor for the thread to wait on.
>> >     val result = Promise[ManagedBuffer]()
>> >     fetchBlocks(host, port, execId, Array(blockId),
>> >       new BlockFetchingListener {
>> >         override def onBlockFetchFailure(blockId: String, exception:
>> > Throwable): Unit = {
>> >           result.failure(exception)
>> >         }
>> >         override def onBlockFetchSuccess(blockId: String, data:
>> > ManagedBuffer): Unit = {
>> >           val ret = ByteBuffer.allocate(data.size.toInt)
>> >           ret.put(data.nioByteBuffer())
>> >           ret.flip()
>> >           result.success(new NioManagedBuffer(ret))
>> >         }
>> >       })
>> >
>> >     Await.result(result.future, Duration.Inf)
>> >   }
>> >
>> > it seems that fetchBlockSync method does not have a timeout limit but
>> wait
>> > forever ? Anybody can show me how to control the timeout here?
>>
>
>

Re: broadcast hang out

Posted by lonely Feb <lo...@gmail.com>.
yes

2015-03-16 11:43 GMT+08:00 Mridul Muralidharan <mr...@gmail.com>:

> Cross region as in different data centers ?
>
> - Mridul
>
> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
> > Hi all, i meet up with a problem that torrent broadcast hang out in my
> > spark cluster (1.2, standalone) , particularly serious when driver and
> > executors are cross-region. when i read the code of broadcast i found
> that
> > a sync block read here:
> >
> >   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> > String): ManagedBuffer = {
> >     // A monitor for the thread to wait on.
> >     val result = Promise[ManagedBuffer]()
> >     fetchBlocks(host, port, execId, Array(blockId),
> >       new BlockFetchingListener {
> >         override def onBlockFetchFailure(blockId: String, exception:
> > Throwable): Unit = {
> >           result.failure(exception)
> >         }
> >         override def onBlockFetchSuccess(blockId: String, data:
> > ManagedBuffer): Unit = {
> >           val ret = ByteBuffer.allocate(data.size.toInt)
> >           ret.put(data.nioByteBuffer())
> >           ret.flip()
> >           result.success(new NioManagedBuffer(ret))
> >         }
> >       })
> >
> >     Await.result(result.future, Duration.Inf)
> >   }
> >
> > it seems that fetchBlockSync method does not have a timeout limit but
> wait
> > forever ? Anybody can show me how to control the timeout here?
>

Re: broadcast hang out

Posted by Mridul Muralidharan <mr...@gmail.com>.
Cross region as in different data centers ?

- Mridul

On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb <lo...@gmail.com> wrote:
> Hi all, i meet up with a problem that torrent broadcast hang out in my
> spark cluster (1.2, standalone) , particularly serious when driver and
> executors are cross-region. when i read the code of broadcast i found that
> a sync block read here:
>
>   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> String): ManagedBuffer = {
>     // A monitor for the thread to wait on.
>     val result = Promise[ManagedBuffer]()
>     fetchBlocks(host, port, execId, Array(blockId),
>       new BlockFetchingListener {
>         override def onBlockFetchFailure(blockId: String, exception:
> Throwable): Unit = {
>           result.failure(exception)
>         }
>         override def onBlockFetchSuccess(blockId: String, data:
> ManagedBuffer): Unit = {
>           val ret = ByteBuffer.allocate(data.size.toInt)
>           ret.put(data.nioByteBuffer())
>           ret.flip()
>           result.success(new NioManagedBuffer(ret))
>         }
>       })
>
>     Await.result(result.future, Duration.Inf)
>   }
>
> it seems that fetchBlockSync method does not have a timeout limit but wait
> forever ? Anybody can show me how to control the timeout here?

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org