You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Mark Zitnik <ma...@gmail.com> on 2020/07/04 19:48:06 UTC

Fwd: Asynchronous I/O poor performance

Hi

In my flink application I need to enrich data using
AsyncDataStream.unorderedWait
but I am getting poor perforce at the beginning I was just working with
http call, but I have switched to grpc, I running on 8 core node and
getting total of 3200 events per second my service that I am using is not
fully utilized and can produce up to 10000 req/seq

Flink job flow
Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write to
Kafka

Using Akkad grpc code written in scala

Thanks

Re: Asynchronous I/O poor performance

Posted by Arvid Heise <ar...@ververica.com>.
Hi Mark,

I already explained that this latency is only occurring because of the
shuffle step before async IO (e.g. data is sent over network).

If you replace

val x : DataStream[String] = someIntegers.map( _ =>
s"${System.currentTimeMillis()}")

with

val x : DataStream[String] = someIntegers.shuffle.map( _ =>
s"${System.currentTimeMillis()}")

You can see that latency between map and async IO becomes 0.

Throughput and latency are not directly related. You can have very high
throughput but also a high latency if you have many shuffle steps. Latency
is pretty much determined in how long a certain record lives in all network
buffers and the processing time of all involved operators. To understand
it, consider the following program

class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {
  override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {
    val start = str.substring(30000).toLong
    val delta = System.currentTimeMillis() - start
    resultFuture.complete(Iterable((start.toString, s"${delta}")))
  }
}

object Job {
  def main(args: Array[String]): Unit = {
      // set up the execution environment
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      //env.enableCheckpointing(10)
      env.setParallelism(1)

      val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
      //someIntegers.map { _ => System.currentTimeMillis()}.map{ s =>
System.currentTimeMillis()-s}.print()
        val prefix = "*" * 30000
      val x : DataStream[String] = someIntegers.map( _ =>
s"$prefix${System.currentTimeMillis()}")
      val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L,
TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
      //AsyncDataStream.unorderedWait(data , new
AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
      resultStream.print()
      println(env.getConfig.getAutoWatermarkInterval)
      env.execute("Flink Scala API Skeleton")
  }
}

It's your program in condensed form, however, each record is prefixed with
30k * characters. Thus, in one network buffer only 1-2 records fit and thus
the latency is now very little (1-2 ms on my machine), because the record
does not live very long in network buffers.

However, if you would compare how long it takes to process 1m records, you
would see that your initial version is much faster = higher throughput,
because each buffer fits 3.2k records instead of 1-2.

So after you have verified that latency is indeed not an issue here, please
evaluate the thread pool size of akka.

On Thu, Jul 9, 2020 at 9:27 PM Mark Zitnik <ma...@gmail.com> wrote:

> Hi Arvid,
>
> The http client is not my buttoleneck as I said before I check the async
> and I have a delay until it enters to asyncinvoke about 80 ms if some can
> explained me why we have such big delay I have attached a sample code in my
> previous email can some one explain the delay
>
> Thanks
>
> On Mon, 6 Jul 2020, 23:31 Arvid Heise, <ar...@ververica.com> wrote:
>
>> Hi Mark,
>>
>> Async wait operators cannot be chained to sources so the messages go
>> through the network stack. Thus, having some latency is normal and cannot
>> be avoided. It can be tuned though, but I don't think that this is the
>> issue at hand as it should mostly impact latency and affect throughput
>> less. Since external I/O calls are much more heavy weight than our internal
>> communication, both the drop of throughput and the increase in latency are
>> usually dwarfed by the external I/O call costs.
>>
>> Please try to increase the thread pool for akka as written in my previous
>> email and report back.
>>
>> On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik <ma...@gmail.com> wrote:
>>
>>> Hi Benchao,
>>>
>>> i have run this in the code:
>>>
>>> println(env.getConfig.getAutoWatermarkInterval)
>>>
>>> and got 200 i do fully understand how watermarks and AsyncOperator
>>> operator works, but
>>> i have decided to make a simple test that should evaluate the time it
>>> takes to enter to the asyncInvoke method  and it looks that it takes about
>>> 80ms witch is longer than the time it take to get a response from my
>>> micro-service
>>>
>>> code below
>>>
>>> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {
>>>
>>>   implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
>>>
>>>   /*
>>>   implicit val actorSystem = ActorSystem.apply("test", None, None, Some(executor))
>>>   implicit val materializer = ActorMaterializer()
>>>   implicit val executionContext = actorSystem.dispatcher
>>>
>>>
>>>   println(materializer.system.name)
>>>   println("start")
>>>   */
>>> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>>>
>>>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>>>   var actorSystem: ActorSystem = null
>>>   var materializer: ActorMaterializer = null
>>>   var executionContext: ExecutionContextExecutor = null
>>>   //var akkaHttp: HttpExt = null
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>>     actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, Some(ConfigFactory.load("application.conf")), None, Some(executor))
>>>     materializer = ActorMaterializer()(actorSystem)
>>>     executionContext = actorSystem.dispatcher
>>>     //akkaHttp = Http(actorSystem)
>>>   }
>>>
>>>   override def close(): Unit = {
>>>     actorSystem.terminate()
>>>   }
>>>
>>>   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
>>>         val start = str.toLong
>>>         val delta = System.currentTimeMillis() - start
>>>         resultFuture.complete(Iterable((str, s"${delta}")))
>>>   }
>>> }
>>>
>>>
>>> object Job {
>>>   def main(args: Array[String]): Unit = {
>>>     // set up the execution environment
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>     //env.enableCheckpointing(10)
>>>     env.setParallelism(1)
>>>
>>>     val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
>>>     //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => System.currentTimeMillis()-s}.print()
>>>     val x : DataStream[String] = someIntegers.map( _ => s"${System.currentTimeMillis()}")
>>>     val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
>>>       //AsyncDataStream.unorderedWait(data , new AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
>>>     resultStream.print()
>>>     println(env.getConfig.getAutoWatermarkInterval)
>>>     env.execute("Flink Scala API Skeleton")
>>>   }
>>> }
>>>
>>> is this normal behavior?
>>>
>>>
>>> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li <li...@apache.org> wrote:
>>>
>>>> Hi Mark,
>>>>
>>>> According to your data, I think the config of AsyncOperator is OK.
>>>> There is one more config that might affect the throughput of
>>>> AsyncOperator, it's watermark.
>>>> Because unordered async operator still keeps the order between
>>>> watermarks, did you use
>>>> event time in your job, and if yes, what's the watermark interval in
>>>> your job?
>>>>
>>>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 下午7:44写道:
>>>>
>>>>> Hi Benchao
>>>>>
>>>>> The capacity is 100
>>>>> Parallelism is 8
>>>>> Rpc req is 20ms
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> On Sun, 5 Jul 2020, 6:16 Benchao Li, <li...@apache.org> wrote:
>>>>>
>>>>>> Hi Mark,
>>>>>>
>>>>>> Could you give more details about your Flink job?
>>>>>> - the capacity of AsyncDataStream
>>>>>> - the parallelism of AsyncDataStream operator
>>>>>> - the time of per blocked rpc request
>>>>>>
>>>>>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> In my flink application I need to enrich data using AsyncDataStream.unorderedWait
>>>>>>> but I am getting poor perforce at the beginning I was just working
>>>>>>> with http call, but I have switched to grpc, I running on 8 core node and
>>>>>>> getting total of 3200 events per second my service that I am using is not
>>>>>>> fully utilized and can produce up to 10000 req/seq
>>>>>>>
>>>>>>> Flink job flow
>>>>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~>
>>>>>>> write to Kafka
>>>>>>>
>>>>>>> Using Akkad grpc code written in scala
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Best,
>>>>>> Benchao Li
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Best,
>>>> Benchao Li
>>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Asynchronous I/O poor performance

Posted by Mark Zitnik <ma...@gmail.com>.
Hi Arvid,

The http client is not my buttoleneck as I said before I check the async
and I have a delay until it enters to asyncinvoke about 80 ms if some can
explained me why we have such big delay I have attached a sample code in my
previous email can some one explain the delay

Thanks

On Mon, 6 Jul 2020, 23:31 Arvid Heise, <ar...@ververica.com> wrote:

> Hi Mark,
>
> Async wait operators cannot be chained to sources so the messages go
> through the network stack. Thus, having some latency is normal and cannot
> be avoided. It can be tuned though, but I don't think that this is the
> issue at hand as it should mostly impact latency and affect throughput
> less. Since external I/O calls are much more heavy weight than our internal
> communication, both the drop of throughput and the increase in latency are
> usually dwarfed by the external I/O call costs.
>
> Please try to increase the thread pool for akka as written in my previous
> email and report back.
>
> On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik <ma...@gmail.com> wrote:
>
>> Hi Benchao,
>>
>> i have run this in the code:
>>
>> println(env.getConfig.getAutoWatermarkInterval)
>>
>> and got 200 i do fully understand how watermarks and AsyncOperator
>> operator works, but
>> i have decided to make a simple test that should evaluate the time it
>> takes to enter to the asyncInvoke method  and it looks that it takes about
>> 80ms witch is longer than the time it take to get a response from my
>> micro-service
>>
>> code below
>>
>> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {
>>
>>   implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
>>
>>   /*
>>   implicit val actorSystem = ActorSystem.apply("test", None, None, Some(executor))
>>   implicit val materializer = ActorMaterializer()
>>   implicit val executionContext = actorSystem.dispatcher
>>
>>
>>   println(materializer.system.name)
>>   println("start")
>>   */
>> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>>
>>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>>   var actorSystem: ActorSystem = null
>>   var materializer: ActorMaterializer = null
>>   var executionContext: ExecutionContextExecutor = null
>>   //var akkaHttp: HttpExt = null
>>
>>   override def open(parameters: Configuration): Unit = {
>>     actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, Some(ConfigFactory.load("application.conf")), None, Some(executor))
>>     materializer = ActorMaterializer()(actorSystem)
>>     executionContext = actorSystem.dispatcher
>>     //akkaHttp = Http(actorSystem)
>>   }
>>
>>   override def close(): Unit = {
>>     actorSystem.terminate()
>>   }
>>
>>   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
>>         val start = str.toLong
>>         val delta = System.currentTimeMillis() - start
>>         resultFuture.complete(Iterable((str, s"${delta}")))
>>   }
>> }
>>
>>
>> object Job {
>>   def main(args: Array[String]): Unit = {
>>     // set up the execution environment
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>     //env.enableCheckpointing(10)
>>     env.setParallelism(1)
>>
>>     val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
>>     //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => System.currentTimeMillis()-s}.print()
>>     val x : DataStream[String] = someIntegers.map( _ => s"${System.currentTimeMillis()}")
>>     val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
>>       //AsyncDataStream.unorderedWait(data , new AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
>>     resultStream.print()
>>     println(env.getConfig.getAutoWatermarkInterval)
>>     env.execute("Flink Scala API Skeleton")
>>   }
>> }
>>
>> is this normal behavior?
>>
>>
>> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li <li...@apache.org> wrote:
>>
>>> Hi Mark,
>>>
>>> According to your data, I think the config of AsyncOperator is OK.
>>> There is one more config that might affect the throughput of
>>> AsyncOperator, it's watermark.
>>> Because unordered async operator still keeps the order between
>>> watermarks, did you use
>>> event time in your job, and if yes, what's the watermark interval in
>>> your job?
>>>
>>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 下午7:44写道:
>>>
>>>> Hi Benchao
>>>>
>>>> The capacity is 100
>>>> Parallelism is 8
>>>> Rpc req is 20ms
>>>>
>>>> Thanks
>>>>
>>>>
>>>> On Sun, 5 Jul 2020, 6:16 Benchao Li, <li...@apache.org> wrote:
>>>>
>>>>> Hi Mark,
>>>>>
>>>>> Could you give more details about your Flink job?
>>>>> - the capacity of AsyncDataStream
>>>>> - the parallelism of AsyncDataStream operator
>>>>> - the time of per blocked rpc request
>>>>>
>>>>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> In my flink application I need to enrich data using AsyncDataStream.unorderedWait
>>>>>> but I am getting poor perforce at the beginning I was just working
>>>>>> with http call, but I have switched to grpc, I running on 8 core node and
>>>>>> getting total of 3200 events per second my service that I am using is not
>>>>>> fully utilized and can produce up to 10000 req/seq
>>>>>>
>>>>>> Flink job flow
>>>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~>
>>>>>> write to Kafka
>>>>>>
>>>>>> Using Akkad grpc code written in scala
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Best,
>>>>> Benchao Li
>>>>>
>>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Asynchronous I/O poor performance

Posted by Arvid Heise <ar...@ververica.com>.
Hi Mark,

Async wait operators cannot be chained to sources so the messages go
through the network stack. Thus, having some latency is normal and cannot
be avoided. It can be tuned though, but I don't think that this is the
issue at hand as it should mostly impact latency and affect throughput
less. Since external I/O calls are much more heavy weight than our internal
communication, both the drop of throughput and the increase in latency are
usually dwarfed by the external I/O call costs.

Please try to increase the thread pool for akka as written in my previous
email and report back.

On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik <ma...@gmail.com> wrote:

> Hi Benchao,
>
> i have run this in the code:
>
> println(env.getConfig.getAutoWatermarkInterval)
>
> and got 200 i do fully understand how watermarks and AsyncOperator
> operator works, but
> i have decided to make a simple test that should evaluate the time it
> takes to enter to the asyncInvoke method  and it looks that it takes about
> 80ms witch is longer than the time it take to get a response from my
> micro-service
>
> code below
>
> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {
>
>   implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
>
>   /*
>   implicit val actorSystem = ActorSystem.apply("test", None, None, Some(executor))
>   implicit val materializer = ActorMaterializer()
>   implicit val executionContext = actorSystem.dispatcher
>
>
>   println(materializer.system.name)
>   println("start")
>   */
> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>
>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>   var actorSystem: ActorSystem = null
>   var materializer: ActorMaterializer = null
>   var executionContext: ExecutionContextExecutor = null
>   //var akkaHttp: HttpExt = null
>
>   override def open(parameters: Configuration): Unit = {
>     actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, Some(ConfigFactory.load("application.conf")), None, Some(executor))
>     materializer = ActorMaterializer()(actorSystem)
>     executionContext = actorSystem.dispatcher
>     //akkaHttp = Http(actorSystem)
>   }
>
>   override def close(): Unit = {
>     actorSystem.terminate()
>   }
>
>   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
>         val start = str.toLong
>         val delta = System.currentTimeMillis() - start
>         resultFuture.complete(Iterable((str, s"${delta}")))
>   }
> }
>
>
> object Job {
>   def main(args: Array[String]): Unit = {
>     // set up the execution environment
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>     //env.enableCheckpointing(10)
>     env.setParallelism(1)
>
>     val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
>     //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => System.currentTimeMillis()-s}.print()
>     val x : DataStream[String] = someIntegers.map( _ => s"${System.currentTimeMillis()}")
>     val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
>       //AsyncDataStream.unorderedWait(data , new AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
>     resultStream.print()
>     println(env.getConfig.getAutoWatermarkInterval)
>     env.execute("Flink Scala API Skeleton")
>   }
> }
>
> is this normal behavior?
>
>
> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li <li...@apache.org> wrote:
>
>> Hi Mark,
>>
>> According to your data, I think the config of AsyncOperator is OK.
>> There is one more config that might affect the throughput of
>> AsyncOperator, it's watermark.
>> Because unordered async operator still keeps the order between
>> watermarks, did you use
>> event time in your job, and if yes, what's the watermark interval in your
>> job?
>>
>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 下午7:44写道:
>>
>>> Hi Benchao
>>>
>>> The capacity is 100
>>> Parallelism is 8
>>> Rpc req is 20ms
>>>
>>> Thanks
>>>
>>>
>>> On Sun, 5 Jul 2020, 6:16 Benchao Li, <li...@apache.org> wrote:
>>>
>>>> Hi Mark,
>>>>
>>>> Could you give more details about your Flink job?
>>>> - the capacity of AsyncDataStream
>>>> - the parallelism of AsyncDataStream operator
>>>> - the time of per blocked rpc request
>>>>
>>>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>>>>
>>>>> Hi
>>>>>
>>>>> In my flink application I need to enrich data using AsyncDataStream.unorderedWait
>>>>> but I am getting poor perforce at the beginning I was just working
>>>>> with http call, but I have switched to grpc, I running on 8 core node and
>>>>> getting total of 3200 events per second my service that I am using is not
>>>>> fully utilized and can produce up to 10000 req/seq
>>>>>
>>>>> Flink job flow
>>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~>
>>>>> write to Kafka
>>>>>
>>>>> Using Akkad grpc code written in scala
>>>>>
>>>>> Thanks
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Best,
>>>> Benchao Li
>>>>
>>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Asynchronous I/O poor performance

Posted by Mark Zitnik <ma...@gmail.com>.
Hi Benchao,

i have run this in the code:

println(env.getConfig.getAutoWatermarkInterval)

and got 200 i do fully understand how watermarks and AsyncOperator operator
works, but
i have decided to make a simple test that should evaluate the time it takes
to enter to the asyncInvoke method  and it looks that it takes about 80ms
witch is longer than the time it take to get a response from my
micro-service

code below

class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {

  implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())

  /*
  implicit val actorSystem = ActorSystem.apply("test", None, None,
Some(executor))
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = actorSystem.dispatcher


  println(materializer.system.name)
  println("start")
  */
// redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com

  // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
  var actorSystem: ActorSystem = null
  var materializer: ActorMaterializer = null
  var executionContext: ExecutionContextExecutor = null
  //var akkaHttp: HttpExt = null

  override def open(parameters: Configuration): Unit = {
    actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString,
Some(ConfigFactory.load("application.conf")), None, Some(executor))
    materializer = ActorMaterializer()(actorSystem)
    executionContext = actorSystem.dispatcher
    //akkaHttp = Http(actorSystem)
  }

  override def close(): Unit = {
    actorSystem.terminate()
  }

  override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {
        val start = str.toLong
        val delta = System.currentTimeMillis() - start
        resultFuture.complete(Iterable((str, s"${delta}")))
  }
}


object Job {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //env.enableCheckpointing(10)
    env.setParallelism(1)

    val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
    //someIntegers.map { _ => System.currentTimeMillis()}.map{ s =>
System.currentTimeMillis()-s}.print()
    val x : DataStream[String] = someIntegers.map( _ =>
s"${System.currentTimeMillis()}")
    val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L,
TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
      //AsyncDataStream.unorderedWait(data , new
AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
    resultStream.print()
    println(env.getConfig.getAutoWatermarkInterval)
    env.execute("Flink Scala API Skeleton")
  }
}

is this normal behavior?


On Mon, Jul 6, 2020 at 2:45 PM Benchao Li <li...@apache.org> wrote:

> Hi Mark,
>
> According to your data, I think the config of AsyncOperator is OK.
> There is one more config that might affect the throughput of
> AsyncOperator, it's watermark.
> Because unordered async operator still keeps the order between watermarks,
> did you use
> event time in your job, and if yes, what's the watermark interval in your
> job?
>
> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 下午7:44写道:
>
>> Hi Benchao
>>
>> The capacity is 100
>> Parallelism is 8
>> Rpc req is 20ms
>>
>> Thanks
>>
>>
>> On Sun, 5 Jul 2020, 6:16 Benchao Li, <li...@apache.org> wrote:
>>
>>> Hi Mark,
>>>
>>> Could you give more details about your Flink job?
>>> - the capacity of AsyncDataStream
>>> - the parallelism of AsyncDataStream operator
>>> - the time of per blocked rpc request
>>>
>>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>>>
>>>> Hi
>>>>
>>>> In my flink application I need to enrich data using AsyncDataStream.unorderedWait
>>>> but I am getting poor perforce at the beginning I was just working with
>>>> http call, but I have switched to grpc, I running on 8 core node and
>>>> getting total of 3200 events per second my service that I am using is not
>>>> fully utilized and can produce up to 10000 req/seq
>>>>
>>>> Flink job flow
>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>>>> to Kafka
>>>>
>>>> Using Akkad grpc code written in scala
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>

Re: Asynchronous I/O poor performance

Posted by Arvid Heise <ar...@ververica.com>.
Hi Mark,

could you please check if you can tune akka? Usually in async I/O, the used
library uses a thread pool that becomes the actual bottleneck.

If you configure async I/O to use a capacity of 100 and parallelism of 8 on
one node, you also need to have ~800 threads in akka (500 might be enough
because of overhead) or else async I/O gets blocked while waiting for akka
threads to become available.

Best,

Arvid

On Mon, Jul 6, 2020 at 1:45 PM Benchao Li <li...@apache.org> wrote:

> Hi Mark,
>
> According to your data, I think the config of AsyncOperator is OK.
> There is one more config that might affect the throughput of
> AsyncOperator, it's watermark.
> Because unordered async operator still keeps the order between watermarks,
> did you use
> event time in your job, and if yes, what's the watermark interval in your
> job?
>
> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 下午7:44写道:
>
>> Hi Benchao
>>
>> The capacity is 100
>> Parallelism is 8
>> Rpc req is 20ms
>>
>> Thanks
>>
>>
>> On Sun, 5 Jul 2020, 6:16 Benchao Li, <li...@apache.org> wrote:
>>
>>> Hi Mark,
>>>
>>> Could you give more details about your Flink job?
>>> - the capacity of AsyncDataStream
>>> - the parallelism of AsyncDataStream operator
>>> - the time of per blocked rpc request
>>>
>>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>>>
>>>> Hi
>>>>
>>>> In my flink application I need to enrich data using AsyncDataStream.unorderedWait
>>>> but I am getting poor perforce at the beginning I was just working with
>>>> http call, but I have switched to grpc, I running on 8 core node and
>>>> getting total of 3200 events per second my service that I am using is not
>>>> fully utilized and can produce up to 10000 req/seq
>>>>
>>>> Flink job flow
>>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>>>> to Kafka
>>>>
>>>> Using Akkad grpc code written in scala
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Asynchronous I/O poor performance

Posted by Benchao Li <li...@apache.org>.
Hi Mark,

According to your data, I think the config of AsyncOperator is OK.
There is one more config that might affect the throughput of AsyncOperator,
it's watermark.
Because unordered async operator still keeps the order between watermarks,
did you use
event time in your job, and if yes, what's the watermark interval in your
job?

Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 下午7:44写道:

> Hi Benchao
>
> The capacity is 100
> Parallelism is 8
> Rpc req is 20ms
>
> Thanks
>
>
> On Sun, 5 Jul 2020, 6:16 Benchao Li, <li...@apache.org> wrote:
>
>> Hi Mark,
>>
>> Could you give more details about your Flink job?
>> - the capacity of AsyncDataStream
>> - the parallelism of AsyncDataStream operator
>> - the time of per blocked rpc request
>>
>> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>>
>>> Hi
>>>
>>> In my flink application I need to enrich data using AsyncDataStream.unorderedWait
>>> but I am getting poor perforce at the beginning I was just working with
>>> http call, but I have switched to grpc, I running on 8 core node and
>>> getting total of 3200 events per second my service that I am using is not
>>> fully utilized and can produce up to 10000 req/seq
>>>
>>> Flink job flow
>>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>>> to Kafka
>>>
>>> Using Akkad grpc code written in scala
>>>
>>> Thanks
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li

Re: Asynchronous I/O poor performance

Posted by Mark Zitnik <ma...@gmail.com>.
Hi Benchao

The capacity is 100
Parallelism is 8
Rpc req is 20ms

Thanks


On Sun, 5 Jul 2020, 6:16 Benchao Li, <li...@apache.org> wrote:

> Hi Mark,
>
> Could you give more details about your Flink job?
> - the capacity of AsyncDataStream
> - the parallelism of AsyncDataStream operator
> - the time of per blocked rpc request
>
> Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 上午3:48写道:
>
>> Hi
>>
>> In my flink application I need to enrich data using AsyncDataStream.unorderedWait
>> but I am getting poor perforce at the beginning I was just working with
>> http call, but I have switched to grpc, I running on 8 core node and
>> getting total of 3200 events per second my service that I am using is not
>> fully utilized and can produce up to 10000 req/seq
>>
>> Flink job flow
>> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write
>> to Kafka
>>
>> Using Akkad grpc code written in scala
>>
>> Thanks
>>
>
>
> --
>
> Best,
> Benchao Li
>

Re: Asynchronous I/O poor performance

Posted by Benchao Li <li...@apache.org>.
Hi Mark,

Could you give more details about your Flink job?
- the capacity of AsyncDataStream
- the parallelism of AsyncDataStream operator
- the time of per blocked rpc request

Mark Zitnik <ma...@gmail.com> 于2020年7月5日周日 上午3:48写道:

> Hi
>
> In my flink application I need to enrich data using AsyncDataStream.unorderedWait
> but I am getting poor perforce at the beginning I was just working with
> http call, but I have switched to grpc, I running on 8 core node and
> getting total of 3200 events per second my service that I am using is not
> fully utilized and can produce up to 10000 req/seq
>
> Flink job flow
> Reading from Kafka ~> some enrichment with unoderedwait ~> map ~> write to
> Kafka
>
> Using Akkad grpc code written in scala
>
> Thanks
>


-- 

Best,
Benchao Li