You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by おぎばやしひろのり <og...@gmail.com> on 2016/02/26 11:23:10 UTC

Flink streaming throughput

Hello,

I started evaluating Flink and tried simple performance test.
The result was just about 4000 messages/sec with 300% CPU usage. I
think this is quite low and wondering if it is a reasonable result.
If someone could check it, it would be great.

Here is the detail:

[servers]
- 3 Kafka broker with 3 partitions
- 3 Flink TaskManager + 1 JobManager
- 1 Elasticsearch
All of them are separate VM with 8vCPU, 8GB memory

[test case]
The application counts access log by URI with in 1 minute window and
send the result to Elasticsearch. The actual code is below.
I used '-p 3' option to flink run command, so the task was distributed
to 3 TaskManagers.
In the test, I sent about 5000 logs/sec to Kafka.

[result]
- From Elasticsearch records, the total access count for all URI was
about 260,000/min = 4300/sec. This is the entire throughput.
- Kafka consumer lag was keep growing.
- The CPU usage of each TaskManager machine was about 13-14%. From top
command output, Flink java process was using 100%(1 CPU full)

So I thought the bottleneck here was CPU used by Flink Tasks.

Here is the application code.
---
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(1000)
...
    val stream = env
      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
SimpleStringSchema(), properties))
      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String, AnyRef]] }
      .map{ x => x.get("uri") match {
        case Some(y) => (y.asInstanceOf[String],1)
        case None => ("", 1)
      }}
      .keyBy(0)
      .timeWindow(Time.of(1, TimeUnit.MINUTES))
      .sum(1)
      .map{ x => (System.currentTimeMillis(), x)}
      .addSink(new ElasticsearchSink(config, transports, new
IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
        override def createIndexRequest(element: Tuple2[Long,
Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
          val json = new HashMap[String, AnyRef]
          json.put("@timestamp", new Timestamp(element._1))
          json.put("uri", element._2._1)
          json.put("count", element._2._2: java.lang.Integer)
          println("SENDING: " + element)
          Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
        }
      }))
---

Regards,
Hironori Ogibayashi

Re: Flink streaming throughput

Posted by おぎばやしひろのり <og...@gmail.com>.
Milinda,

Thanks. I will try.

Regards,
Hironori
2016/03/16 1:31 "Milinda Pathirage" <mp...@umail.iu.edu>:

> Hi Hironori,
>
> [1] and [2] describes the process of measuring Kafka performance. I think
> the perf test code is under org.apache.kafka.tools package in 0.9, so you
> may have to change commands in [2] to reflect that.
>
> Thanks
> Milinda
>
> [1]
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> [2] https://gist.github.com/jkreps/c7ddb4041ef62a900e6c
>
> On Tue, Mar 15, 2016 at 11:35 AM, おぎばやしひろのり <og...@gmail.com> wrote:
>
>> Robert,
>>
>> Thank you for your response.
>> I would like to try  kafka-console-consumer but I have no idea about
>> how to measure the consuming throughput.
>> Are there any standard way?
>> I would also try Kafka broker on physical servers.
>>
>> Regarding version, I have upgraded to Flink 1.0.0 and replaced
>> FlinkKafkaConsumer 082 with 09, but did not see
>> any difference in performance.
>>
>> Regards,
>> Hironori
>>
>>
>>
>> 2016-03-11 23:25 GMT+09:00 Robert Metzger <rm...@apache.org>:
>> > Hi Hironori,
>> >
>> > can you try with the kafka-console-consumer how many messages you can
>> read
>> > in one minute?
>> > Maybe the broker's disk I/O is limited because everything is running in
>> > virtual machines (potentially sharing one hard disk?)
>> > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker
>> is
>> > working as expected.
>> >
>> > Our Kafka 0.8 consumer has been tested in environments where its reading
>> > with more than 100 MB/s per from a broker.
>> >
>> >
>> > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <og...@gmail.com>
>> wrote:
>> >>
>> >> Aljoscha,
>> >>
>> >> Thank you for your response.
>> >>
>> >> I tried no JSON parsing and no sink (DiscardingSink) case. The
>> >> throughput was 8228msg/sec.
>> >> Slightly better than JSON + Elasticsearch case.
>> >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
>> >> that case, the result was
>> >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
>> >> server was the bottleneck)
>> >> That was amazing, although Flink's fault tolerance feature is not
>> >> available with socketTextStream.
>> >>
>> >> Regards,
>> >> Hironori
>> >>
>> >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <al...@apache.org>:
>> >> > Hi,
>> >> > Another interesting test would be a combination of 3) and 2). I.e. no
>> >> > JSON parsing and no sink. This would show what the raw throughput
>> can be
>> >> > before being slowed down by writing to Elasticsearch.
>> >> >
>> >> > Also .print() is also not feasible for production since it just
>> prints
>> >> > every element to the stdout log on the TaskManagers, which itself
>> can cause
>> >> > quite a slowdown. You could try:
>> >> >
>> >> > datastream.addSink(new DiscardingSink())
>> >> >
>> >> > which is a dummy sink that does nothing.
>> >> >
>> >> > Cheers,
>> >> > Aljoscha
>> >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <og...@gmail.com> wrote:
>> >> >>
>> >> >> Stephan,
>> >> >>
>> >> >> Sorry for the delay in my response.
>> >> >> I tried 3 cases you suggested.
>> >> >>
>> >> >> This time, I set parallelism to 1 for simpicity.
>> >> >>
>> >> >> 0) base performance (same as the first e-mail): 1,480msg/sec
>> >> >> 1) Disable checkpointing : almost same as 0)
>> >> >> 2) No ES sink. just print() : 1,510msg/sec
>> >> >> 3) JSON to TSV : 8,000msg/sec
>> >> >>
>> >> >> So, as you can see, the bottleneck was JSON parsing. I also want to
>> >> >> try eliminating Kafka to see
>> >> >> if there is a room to improve performance.(Currently, I am using
>> >> >> FlinkKafkaConsumer082 with Kafka 0.9
>> >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
>> >> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
>> >> >> Flink's scalability and fault tolerance.
>> >> >> Thank you for your advice.
>> >> >>
>> >> >> Regards,
>> >> >> Hironori Ogibayashi
>> >> >>
>> >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <og...@gmail.com>:
>> >> >>> Stephan,
>> >> >>>
>> >> >>> Thank you for your quick response.
>> >> >>> I will try and post the result later.
>> >> >>>
>> >> >>> Regards,
>> >> >>> Hironori
>> >> >>>
>> >> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>> >> >>>> Hi!
>> >> >>>>
>> >> >>>> I would try and dig bit by bit into what the bottleneck is:
>> >> >>>>
>> >> >>>> 1) Disable the checkpointing, see what difference that makes
>> >> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to
>> see
>> >> >>>> if that
>> >> >>>> is limiting
>> >> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU
>> intensive
>> >> >>>> and
>> >> >>>> easily dominate the entire pipeline.
>> >> >>>>
>> >> >>>> Greetings,
>> >> >>>> Stephan
>> >> >>>>
>> >> >>>>
>> >> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <ogibayashi@gmail.com
>> >
>> >> >>>> wrote:
>> >> >>>>>
>> >> >>>>> Hello,
>> >> >>>>>
>> >> >>>>> I started evaluating Flink and tried simple performance test.
>> >> >>>>> The result was just about 4000 messages/sec with 300% CPU usage.
>> I
>> >> >>>>> think this is quite low and wondering if it is a reasonable
>> result.
>> >> >>>>> If someone could check it, it would be great.
>> >> >>>>>
>> >> >>>>> Here is the detail:
>> >> >>>>>
>> >> >>>>> [servers]
>> >> >>>>> - 3 Kafka broker with 3 partitions
>> >> >>>>> - 3 Flink TaskManager + 1 JobManager
>> >> >>>>> - 1 Elasticsearch
>> >> >>>>> All of them are separate VM with 8vCPU, 8GB memory
>> >> >>>>>
>> >> >>>>> [test case]
>> >> >>>>> The application counts access log by URI with in 1 minute window
>> and
>> >> >>>>> send the result to Elasticsearch. The actual code is below.
>> >> >>>>> I used '-p 3' option to flink run command, so the task was
>> >> >>>>> distributed
>> >> >>>>> to 3 TaskManagers.
>> >> >>>>> In the test, I sent about 5000 logs/sec to Kafka.
>> >> >>>>>
>> >> >>>>> [result]
>> >> >>>>> - From Elasticsearch records, the total access count for all URI
>> was
>> >> >>>>> about 260,000/min = 4300/sec. This is the entire throughput.
>> >> >>>>> - Kafka consumer lag was keep growing.
>> >> >>>>> - The CPU usage of each TaskManager machine was about 13-14%.
>> From
>> >> >>>>> top
>> >> >>>>> command output, Flink java process was using 100%(1 CPU full)
>> >> >>>>>
>> >> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>> >> >>>>>
>> >> >>>>> Here is the application code.
>> >> >>>>> ---
>> >> >>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >> >>>>>    env.enableCheckpointing(1000)
>> >> >>>>> ...
>> >> >>>>>    val stream = env
>> >> >>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy",
>> new
>> >> >>>>> SimpleStringSchema(), properties))
>> >> >>>>>      .map{ json =>
>> JSON.parseFull(json).get.asInstanceOf[Map[String,
>> >> >>>>> AnyRef]] }
>> >> >>>>>      .map{ x => x.get("uri") match {
>> >> >>>>>        case Some(y) => (y.asInstanceOf[String],1)
>> >> >>>>>        case None => ("", 1)
>> >> >>>>>      }}
>> >> >>>>>      .keyBy(0)
>> >> >>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
>> >> >>>>>      .sum(1)
>> >> >>>>>      .map{ x => (System.currentTimeMillis(), x)}
>> >> >>>>>      .addSink(new ElasticsearchSink(config, transports, new
>> >> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>> >> >>>>>        override def createIndexRequest(element: Tuple2[Long,
>> >> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>> >> >>>>>          val json = new HashMap[String, AnyRef]
>> >> >>>>>          json.put("@timestamp", new Timestamp(element._1))
>> >> >>>>>          json.put("uri", element._2._1)
>> >> >>>>>          json.put("count", element._2._2: java.lang.Integer)
>> >> >>>>>          println("SENDING: " + element)
>> >> >>>>>
>> >> >>>>>
>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>> >> >>>>>        }
>> >> >>>>>      }))
>> >> >>>>> ---
>> >> >>>>>
>> >> >>>>> Regards,
>> >> >>>>> Hironori Ogibayashi
>> >> >>>>
>> >> >>>>
>> >> >
>> >
>> >
>>
>
>
>
> --
> Milinda Pathirage
>
> PhD Student | Research Assistant
> School of Informatics and Computing | Data to Insight Center
> Indiana University
>
> twitter: milindalakmal
> skype: milinda.pathirage
> blog: http://milinda.pathirage.org
>

Re: Flink streaming throughput

Posted by Milinda Pathirage <mp...@umail.iu.edu>.
Hi Hironori,

[1] and [2] describes the process of measuring Kafka performance. I think
the perf test code is under org.apache.kafka.tools package in 0.9, so you
may have to change commands in [2] to reflect that.

Thanks
Milinda

[1]
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
[2] https://gist.github.com/jkreps/c7ddb4041ef62a900e6c

On Tue, Mar 15, 2016 at 11:35 AM, おぎばやしひろのり <og...@gmail.com> wrote:

> Robert,
>
> Thank you for your response.
> I would like to try  kafka-console-consumer but I have no idea about
> how to measure the consuming throughput.
> Are there any standard way?
> I would also try Kafka broker on physical servers.
>
> Regarding version, I have upgraded to Flink 1.0.0 and replaced
> FlinkKafkaConsumer 082 with 09, but did not see
> any difference in performance.
>
> Regards,
> Hironori
>
>
>
> 2016-03-11 23:25 GMT+09:00 Robert Metzger <rm...@apache.org>:
> > Hi Hironori,
> >
> > can you try with the kafka-console-consumer how many messages you can
> read
> > in one minute?
> > Maybe the broker's disk I/O is limited because everything is running in
> > virtual machines (potentially sharing one hard disk?)
> > I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is
> > working as expected.
> >
> > Our Kafka 0.8 consumer has been tested in environments where its reading
> > with more than 100 MB/s per from a broker.
> >
> >
> > On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <og...@gmail.com> wrote:
> >>
> >> Aljoscha,
> >>
> >> Thank you for your response.
> >>
> >> I tried no JSON parsing and no sink (DiscardingSink) case. The
> >> throughput was 8228msg/sec.
> >> Slightly better than JSON + Elasticsearch case.
> >> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
> >> that case, the result was
> >> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
> >> server was the bottleneck)
> >> That was amazing, although Flink's fault tolerance feature is not
> >> available with socketTextStream.
> >>
> >> Regards,
> >> Hironori
> >>
> >> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <al...@apache.org>:
> >> > Hi,
> >> > Another interesting test would be a combination of 3) and 2). I.e. no
> >> > JSON parsing and no sink. This would show what the raw throughput can
> be
> >> > before being slowed down by writing to Elasticsearch.
> >> >
> >> > Also .print() is also not feasible for production since it just prints
> >> > every element to the stdout log on the TaskManagers, which itself can
> cause
> >> > quite a slowdown. You could try:
> >> >
> >> > datastream.addSink(new DiscardingSink())
> >> >
> >> > which is a dummy sink that does nothing.
> >> >
> >> > Cheers,
> >> > Aljoscha
> >> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <og...@gmail.com> wrote:
> >> >>
> >> >> Stephan,
> >> >>
> >> >> Sorry for the delay in my response.
> >> >> I tried 3 cases you suggested.
> >> >>
> >> >> This time, I set parallelism to 1 for simpicity.
> >> >>
> >> >> 0) base performance (same as the first e-mail): 1,480msg/sec
> >> >> 1) Disable checkpointing : almost same as 0)
> >> >> 2) No ES sink. just print() : 1,510msg/sec
> >> >> 3) JSON to TSV : 8,000msg/sec
> >> >>
> >> >> So, as you can see, the bottleneck was JSON parsing. I also want to
> >> >> try eliminating Kafka to see
> >> >> if there is a room to improve performance.(Currently, I am using
> >> >> FlinkKafkaConsumer082 with Kafka 0.9
> >> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
> >> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
> >> >> Flink's scalability and fault tolerance.
> >> >> Thank you for your advice.
> >> >>
> >> >> Regards,
> >> >> Hironori Ogibayashi
> >> >>
> >> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <og...@gmail.com>:
> >> >>> Stephan,
> >> >>>
> >> >>> Thank you for your quick response.
> >> >>> I will try and post the result later.
> >> >>>
> >> >>> Regards,
> >> >>> Hironori
> >> >>>
> >> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
> >> >>>> Hi!
> >> >>>>
> >> >>>> I would try and dig bit by bit into what the bottleneck is:
> >> >>>>
> >> >>>> 1) Disable the checkpointing, see what difference that makes
> >> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see
> >> >>>> if that
> >> >>>> is limiting
> >> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU
> intensive
> >> >>>> and
> >> >>>> easily dominate the entire pipeline.
> >> >>>>
> >> >>>> Greetings,
> >> >>>> Stephan
> >> >>>>
> >> >>>>
> >> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <og...@gmail.com>
> >> >>>> wrote:
> >> >>>>>
> >> >>>>> Hello,
> >> >>>>>
> >> >>>>> I started evaluating Flink and tried simple performance test.
> >> >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
> >> >>>>> think this is quite low and wondering if it is a reasonable
> result.
> >> >>>>> If someone could check it, it would be great.
> >> >>>>>
> >> >>>>> Here is the detail:
> >> >>>>>
> >> >>>>> [servers]
> >> >>>>> - 3 Kafka broker with 3 partitions
> >> >>>>> - 3 Flink TaskManager + 1 JobManager
> >> >>>>> - 1 Elasticsearch
> >> >>>>> All of them are separate VM with 8vCPU, 8GB memory
> >> >>>>>
> >> >>>>> [test case]
> >> >>>>> The application counts access log by URI with in 1 minute window
> and
> >> >>>>> send the result to Elasticsearch. The actual code is below.
> >> >>>>> I used '-p 3' option to flink run command, so the task was
> >> >>>>> distributed
> >> >>>>> to 3 TaskManagers.
> >> >>>>> In the test, I sent about 5000 logs/sec to Kafka.
> >> >>>>>
> >> >>>>> [result]
> >> >>>>> - From Elasticsearch records, the total access count for all URI
> was
> >> >>>>> about 260,000/min = 4300/sec. This is the entire throughput.
> >> >>>>> - Kafka consumer lag was keep growing.
> >> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From
> >> >>>>> top
> >> >>>>> command output, Flink java process was using 100%(1 CPU full)
> >> >>>>>
> >> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
> >> >>>>>
> >> >>>>> Here is the application code.
> >> >>>>> ---
> >> >>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> >> >>>>>    env.enableCheckpointing(1000)
> >> >>>>> ...
> >> >>>>>    val stream = env
> >> >>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy",
> new
> >> >>>>> SimpleStringSchema(), properties))
> >> >>>>>      .map{ json =>
> JSON.parseFull(json).get.asInstanceOf[Map[String,
> >> >>>>> AnyRef]] }
> >> >>>>>      .map{ x => x.get("uri") match {
> >> >>>>>        case Some(y) => (y.asInstanceOf[String],1)
> >> >>>>>        case None => ("", 1)
> >> >>>>>      }}
> >> >>>>>      .keyBy(0)
> >> >>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
> >> >>>>>      .sum(1)
> >> >>>>>      .map{ x => (System.currentTimeMillis(), x)}
> >> >>>>>      .addSink(new ElasticsearchSink(config, transports, new
> >> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
> >> >>>>>        override def createIndexRequest(element: Tuple2[Long,
> >> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
> >> >>>>>          val json = new HashMap[String, AnyRef]
> >> >>>>>          json.put("@timestamp", new Timestamp(element._1))
> >> >>>>>          json.put("uri", element._2._1)
> >> >>>>>          json.put("count", element._2._2: java.lang.Integer)
> >> >>>>>          println("SENDING: " + element)
> >> >>>>>
> >> >>>>>
> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
> >> >>>>>        }
> >> >>>>>      }))
> >> >>>>> ---
> >> >>>>>
> >> >>>>> Regards,
> >> >>>>> Hironori Ogibayashi
> >> >>>>
> >> >>>>
> >> >
> >
> >
>



-- 
Milinda Pathirage

PhD Student | Research Assistant
School of Informatics and Computing | Data to Insight Center
Indiana University

twitter: milindalakmal
skype: milinda.pathirage
blog: http://milinda.pathirage.org

Re: Flink streaming throughput

Posted by おぎばやしひろのり <og...@gmail.com>.
Robert,

Thank you for your response.
I would like to try  kafka-console-consumer but I have no idea about
how to measure the consuming throughput.
Are there any standard way?
I would also try Kafka broker on physical servers.

Regarding version, I have upgraded to Flink 1.0.0 and replaced
FlinkKafkaConsumer 082 with 09, but did not see
any difference in performance.

Regards,
Hironori



2016-03-11 23:25 GMT+09:00 Robert Metzger <rm...@apache.org>:
> Hi Hironori,
>
> can you try with the kafka-console-consumer how many messages you can read
> in one minute?
> Maybe the broker's disk I/O is limited because everything is running in
> virtual machines (potentially sharing one hard disk?)
> I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is
> working as expected.
>
> Our Kafka 0.8 consumer has been tested in environments where its reading
> with more than 100 MB/s per from a broker.
>
>
> On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <og...@gmail.com> wrote:
>>
>> Aljoscha,
>>
>> Thank you for your response.
>>
>> I tried no JSON parsing and no sink (DiscardingSink) case. The
>> throughput was 8228msg/sec.
>> Slightly better than JSON + Elasticsearch case.
>> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
>> that case, the result was
>> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
>> server was the bottleneck)
>> That was amazing, although Flink's fault tolerance feature is not
>> available with socketTextStream.
>>
>> Regards,
>> Hironori
>>
>> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <al...@apache.org>:
>> > Hi,
>> > Another interesting test would be a combination of 3) and 2). I.e. no
>> > JSON parsing and no sink. This would show what the raw throughput can be
>> > before being slowed down by writing to Elasticsearch.
>> >
>> > Also .print() is also not feasible for production since it just prints
>> > every element to the stdout log on the TaskManagers, which itself can cause
>> > quite a slowdown. You could try:
>> >
>> > datastream.addSink(new DiscardingSink())
>> >
>> > which is a dummy sink that does nothing.
>> >
>> > Cheers,
>> > Aljoscha
>> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <og...@gmail.com> wrote:
>> >>
>> >> Stephan,
>> >>
>> >> Sorry for the delay in my response.
>> >> I tried 3 cases you suggested.
>> >>
>> >> This time, I set parallelism to 1 for simpicity.
>> >>
>> >> 0) base performance (same as the first e-mail): 1,480msg/sec
>> >> 1) Disable checkpointing : almost same as 0)
>> >> 2) No ES sink. just print() : 1,510msg/sec
>> >> 3) JSON to TSV : 8,000msg/sec
>> >>
>> >> So, as you can see, the bottleneck was JSON parsing. I also want to
>> >> try eliminating Kafka to see
>> >> if there is a room to improve performance.(Currently, I am using
>> >> FlinkKafkaConsumer082 with Kafka 0.9
>> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
>> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
>> >> Flink's scalability and fault tolerance.
>> >> Thank you for your advice.
>> >>
>> >> Regards,
>> >> Hironori Ogibayashi
>> >>
>> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <og...@gmail.com>:
>> >>> Stephan,
>> >>>
>> >>> Thank you for your quick response.
>> >>> I will try and post the result later.
>> >>>
>> >>> Regards,
>> >>> Hironori
>> >>>
>> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>> >>>> Hi!
>> >>>>
>> >>>> I would try and dig bit by bit into what the bottleneck is:
>> >>>>
>> >>>> 1) Disable the checkpointing, see what difference that makes
>> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see
>> >>>> if that
>> >>>> is limiting
>> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive
>> >>>> and
>> >>>> easily dominate the entire pipeline.
>> >>>>
>> >>>> Greetings,
>> >>>> Stephan
>> >>>>
>> >>>>
>> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <og...@gmail.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hello,
>> >>>>>
>> >>>>> I started evaluating Flink and tried simple performance test.
>> >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>> >>>>> think this is quite low and wondering if it is a reasonable result.
>> >>>>> If someone could check it, it would be great.
>> >>>>>
>> >>>>> Here is the detail:
>> >>>>>
>> >>>>> [servers]
>> >>>>> - 3 Kafka broker with 3 partitions
>> >>>>> - 3 Flink TaskManager + 1 JobManager
>> >>>>> - 1 Elasticsearch
>> >>>>> All of them are separate VM with 8vCPU, 8GB memory
>> >>>>>
>> >>>>> [test case]
>> >>>>> The application counts access log by URI with in 1 minute window and
>> >>>>> send the result to Elasticsearch. The actual code is below.
>> >>>>> I used '-p 3' option to flink run command, so the task was
>> >>>>> distributed
>> >>>>> to 3 TaskManagers.
>> >>>>> In the test, I sent about 5000 logs/sec to Kafka.
>> >>>>>
>> >>>>> [result]
>> >>>>> - From Elasticsearch records, the total access count for all URI was
>> >>>>> about 260,000/min = 4300/sec. This is the entire throughput.
>> >>>>> - Kafka consumer lag was keep growing.
>> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From
>> >>>>> top
>> >>>>> command output, Flink java process was using 100%(1 CPU full)
>> >>>>>
>> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>> >>>>>
>> >>>>> Here is the application code.
>> >>>>> ---
>> >>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>> >>>>>    env.enableCheckpointing(1000)
>> >>>>> ...
>> >>>>>    val stream = env
>> >>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>> >>>>> SimpleStringSchema(), properties))
>> >>>>>      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>> >>>>> AnyRef]] }
>> >>>>>      .map{ x => x.get("uri") match {
>> >>>>>        case Some(y) => (y.asInstanceOf[String],1)
>> >>>>>        case None => ("", 1)
>> >>>>>      }}
>> >>>>>      .keyBy(0)
>> >>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
>> >>>>>      .sum(1)
>> >>>>>      .map{ x => (System.currentTimeMillis(), x)}
>> >>>>>      .addSink(new ElasticsearchSink(config, transports, new
>> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>> >>>>>        override def createIndexRequest(element: Tuple2[Long,
>> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>> >>>>>          val json = new HashMap[String, AnyRef]
>> >>>>>          json.put("@timestamp", new Timestamp(element._1))
>> >>>>>          json.put("uri", element._2._1)
>> >>>>>          json.put("count", element._2._2: java.lang.Integer)
>> >>>>>          println("SENDING: " + element)
>> >>>>>
>> >>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>> >>>>>        }
>> >>>>>      }))
>> >>>>> ---
>> >>>>>
>> >>>>> Regards,
>> >>>>> Hironori Ogibayashi
>> >>>>
>> >>>>
>> >
>
>

Re: Flink streaming throughput

Posted by Robert Metzger <rm...@apache.org>.
Hi Hironori,

can you try with the kafka-console-consumer how many messages you can read
in one minute?
Maybe the broker's disk I/O is limited because everything is running in
virtual machines (potentially sharing one hard disk?)
I'm also not sure if running a Kafka 0.8 consumer against a 0.9 broker is
working as expected.

Our Kafka 0.8 consumer has been tested in environments where its reading
with more than 100 MB/s per from a broker.


On Fri, Mar 11, 2016 at 9:33 AM, おぎばやしひろのり <og...@gmail.com> wrote:

> Aljoscha,
>
> Thank you for your response.
>
> I tried no JSON parsing and no sink (DiscardingSink) case. The
> throughput was 8228msg/sec.
> Slightly better than JSON + Elasticsearch case.
> I also tried using socketTextStream instead of FlinkKafkaConsumer, in
> that case, the result was
> 60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
> server was the bottleneck)
> That was amazing, although Flink's fault tolerance feature is not
> available with socketTextStream.
>
> Regards,
> Hironori
>
> 2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <al...@apache.org>:
> > Hi,
> > Another interesting test would be a combination of 3) and 2). I.e. no
> JSON parsing and no sink. This would show what the raw throughput can be
> before being slowed down by writing to Elasticsearch.
> >
> > Also .print() is also not feasible for production since it just prints
> every element to the stdout log on the TaskManagers, which itself can cause
> quite a slowdown. You could try:
> >
> > datastream.addSink(new DiscardingSink())
> >
> > which is a dummy sink that does nothing.
> >
> > Cheers,
> > Aljoscha
> >> On 08 Mar 2016, at 13:31, おぎばやしひろのり <og...@gmail.com> wrote:
> >>
> >> Stephan,
> >>
> >> Sorry for the delay in my response.
> >> I tried 3 cases you suggested.
> >>
> >> This time, I set parallelism to 1 for simpicity.
> >>
> >> 0) base performance (same as the first e-mail): 1,480msg/sec
> >> 1) Disable checkpointing : almost same as 0)
> >> 2) No ES sink. just print() : 1,510msg/sec
> >> 3) JSON to TSV : 8,000msg/sec
> >>
> >> So, as you can see, the bottleneck was JSON parsing. I also want to
> >> try eliminating Kafka to see
> >> if there is a room to improve performance.(Currently, I am using
> >> FlinkKafkaConsumer082 with Kafka 0.9
> >> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
> >> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
> >> Flink's scalability and fault tolerance.
> >> Thank you for your advice.
> >>
> >> Regards,
> >> Hironori Ogibayashi
> >>
> >> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <og...@gmail.com>:
> >>> Stephan,
> >>>
> >>> Thank you for your quick response.
> >>> I will try and post the result later.
> >>>
> >>> Regards,
> >>> Hironori
> >>>
> >>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
> >>>> Hi!
> >>>>
> >>>> I would try and dig bit by bit into what the bottleneck is:
> >>>>
> >>>> 1) Disable the checkpointing, see what difference that makes
> >>>> 2) Use a dummy sink (discarding) rather than elastic search, to see
> if that
> >>>> is limiting
> >>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive
> and
> >>>> easily dominate the entire pipeline.
> >>>>
> >>>> Greetings,
> >>>> Stephan
> >>>>
> >>>>
> >>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <og...@gmail.com>
> wrote:
> >>>>>
> >>>>> Hello,
> >>>>>
> >>>>> I started evaluating Flink and tried simple performance test.
> >>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
> >>>>> think this is quite low and wondering if it is a reasonable result.
> >>>>> If someone could check it, it would be great.
> >>>>>
> >>>>> Here is the detail:
> >>>>>
> >>>>> [servers]
> >>>>> - 3 Kafka broker with 3 partitions
> >>>>> - 3 Flink TaskManager + 1 JobManager
> >>>>> - 1 Elasticsearch
> >>>>> All of them are separate VM with 8vCPU, 8GB memory
> >>>>>
> >>>>> [test case]
> >>>>> The application counts access log by URI with in 1 minute window and
> >>>>> send the result to Elasticsearch. The actual code is below.
> >>>>> I used '-p 3' option to flink run command, so the task was
> distributed
> >>>>> to 3 TaskManagers.
> >>>>> In the test, I sent about 5000 logs/sec to Kafka.
> >>>>>
> >>>>> [result]
> >>>>> - From Elasticsearch records, the total access count for all URI was
> >>>>> about 260,000/min = 4300/sec. This is the entire throughput.
> >>>>> - Kafka consumer lag was keep growing.
> >>>>> - The CPU usage of each TaskManager machine was about 13-14%. From
> top
> >>>>> command output, Flink java process was using 100%(1 CPU full)
> >>>>>
> >>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
> >>>>>
> >>>>> Here is the application code.
> >>>>> ---
> >>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> >>>>>    env.enableCheckpointing(1000)
> >>>>> ...
> >>>>>    val stream = env
> >>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
> >>>>> SimpleStringSchema(), properties))
> >>>>>      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
> >>>>> AnyRef]] }
> >>>>>      .map{ x => x.get("uri") match {
> >>>>>        case Some(y) => (y.asInstanceOf[String],1)
> >>>>>        case None => ("", 1)
> >>>>>      }}
> >>>>>      .keyBy(0)
> >>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
> >>>>>      .sum(1)
> >>>>>      .map{ x => (System.currentTimeMillis(), x)}
> >>>>>      .addSink(new ElasticsearchSink(config, transports, new
> >>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
> >>>>>        override def createIndexRequest(element: Tuple2[Long,
> >>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
> >>>>>          val json = new HashMap[String, AnyRef]
> >>>>>          json.put("@timestamp", new Timestamp(element._1))
> >>>>>          json.put("uri", element._2._1)
> >>>>>          json.put("count", element._2._2: java.lang.Integer)
> >>>>>          println("SENDING: " + element)
> >>>>>
> >>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
> >>>>>        }
> >>>>>      }))
> >>>>> ---
> >>>>>
> >>>>> Regards,
> >>>>> Hironori Ogibayashi
> >>>>
> >>>>
> >
>

Re: Flink streaming throughput

Posted by おぎばやしひろのり <og...@gmail.com>.
Aljoscha,

Thank you for your response.

I tried no JSON parsing and no sink (DiscardingSink) case. The
throughput was 8228msg/sec.
Slightly better than JSON + Elasticsearch case.
I also tried using socketTextStream instead of FlinkKafkaConsumer, in
that case, the result was
60,000 msg/sec with just 40% flink TaskManager CPU Usage. (socket
server was the bottleneck)
That was amazing, although Flink's fault tolerance feature is not
available with socketTextStream.

Regards,
Hironori

2016-03-08 21:36 GMT+09:00 Aljoscha Krettek <al...@apache.org>:
> Hi,
> Another interesting test would be a combination of 3) and 2). I.e. no JSON parsing and no sink. This would show what the raw throughput can be before being slowed down by writing to Elasticsearch.
>
> Also .print() is also not feasible for production since it just prints every element to the stdout log on the TaskManagers, which itself can cause quite a slowdown. You could try:
>
> datastream.addSink(new DiscardingSink())
>
> which is a dummy sink that does nothing.
>
> Cheers,
> Aljoscha
>> On 08 Mar 2016, at 13:31, おぎばやしひろのり <og...@gmail.com> wrote:
>>
>> Stephan,
>>
>> Sorry for the delay in my response.
>> I tried 3 cases you suggested.
>>
>> This time, I set parallelism to 1 for simpicity.
>>
>> 0) base performance (same as the first e-mail): 1,480msg/sec
>> 1) Disable checkpointing : almost same as 0)
>> 2) No ES sink. just print() : 1,510msg/sec
>> 3) JSON to TSV : 8,000msg/sec
>>
>> So, as you can see, the bottleneck was JSON parsing. I also want to
>> try eliminating Kafka to see
>> if there is a room to improve performance.(Currently, I am using
>> FlinkKafkaConsumer082 with Kafka 0.9
>> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
>> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
>> Flink's scalability and fault tolerance.
>> Thank you for your advice.
>>
>> Regards,
>> Hironori Ogibayashi
>>
>> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <og...@gmail.com>:
>>> Stephan,
>>>
>>> Thank you for your quick response.
>>> I will try and post the result later.
>>>
>>> Regards,
>>> Hironori
>>>
>>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>>>> Hi!
>>>>
>>>> I would try and dig bit by bit into what the bottleneck is:
>>>>
>>>> 1) Disable the checkpointing, see what difference that makes
>>>> 2) Use a dummy sink (discarding) rather than elastic search, to see if that
>>>> is limiting
>>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>>>> easily dominate the entire pipeline.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <og...@gmail.com> wrote:
>>>>>
>>>>> Hello,
>>>>>
>>>>> I started evaluating Flink and tried simple performance test.
>>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>>>>> think this is quite low and wondering if it is a reasonable result.
>>>>> If someone could check it, it would be great.
>>>>>
>>>>> Here is the detail:
>>>>>
>>>>> [servers]
>>>>> - 3 Kafka broker with 3 partitions
>>>>> - 3 Flink TaskManager + 1 JobManager
>>>>> - 1 Elasticsearch
>>>>> All of them are separate VM with 8vCPU, 8GB memory
>>>>>
>>>>> [test case]
>>>>> The application counts access log by URI with in 1 minute window and
>>>>> send the result to Elasticsearch. The actual code is below.
>>>>> I used '-p 3' option to flink run command, so the task was distributed
>>>>> to 3 TaskManagers.
>>>>> In the test, I sent about 5000 logs/sec to Kafka.
>>>>>
>>>>> [result]
>>>>> - From Elasticsearch records, the total access count for all URI was
>>>>> about 260,000/min = 4300/sec. This is the entire throughput.
>>>>> - Kafka consumer lag was keep growing.
>>>>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>>>>> command output, Flink java process was using 100%(1 CPU full)
>>>>>
>>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>>>>
>>>>> Here is the application code.
>>>>> ---
>>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>    env.enableCheckpointing(1000)
>>>>> ...
>>>>>    val stream = env
>>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>>>>> SimpleStringSchema(), properties))
>>>>>      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>>>>> AnyRef]] }
>>>>>      .map{ x => x.get("uri") match {
>>>>>        case Some(y) => (y.asInstanceOf[String],1)
>>>>>        case None => ("", 1)
>>>>>      }}
>>>>>      .keyBy(0)
>>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>>>>      .sum(1)
>>>>>      .map{ x => (System.currentTimeMillis(), x)}
>>>>>      .addSink(new ElasticsearchSink(config, transports, new
>>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>>>>        override def createIndexRequest(element: Tuple2[Long,
>>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>>>>          val json = new HashMap[String, AnyRef]
>>>>>          json.put("@timestamp", new Timestamp(element._1))
>>>>>          json.put("uri", element._2._1)
>>>>>          json.put("count", element._2._2: java.lang.Integer)
>>>>>          println("SENDING: " + element)
>>>>>
>>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>>>>        }
>>>>>      }))
>>>>> ---
>>>>>
>>>>> Regards,
>>>>> Hironori Ogibayashi
>>>>
>>>>
>

Re: Flink streaming throughput

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
Another interesting test would be a combination of 3) and 2). I.e. no JSON parsing and no sink. This would show what the raw throughput can be before being slowed down by writing to Elasticsearch.

Also .print() is also not feasible for production since it just prints every element to the stdout log on the TaskManagers, which itself can cause quite a slowdown. You could try:

datastream.addSink(new DiscardingSink())

which is a dummy sink that does nothing.

Cheers,
Aljoscha
> On 08 Mar 2016, at 13:31, おぎばやしひろのり <og...@gmail.com> wrote:
> 
> Stephan,
> 
> Sorry for the delay in my response.
> I tried 3 cases you suggested.
> 
> This time, I set parallelism to 1 for simpicity.
> 
> 0) base performance (same as the first e-mail): 1,480msg/sec
> 1) Disable checkpointing : almost same as 0)
> 2) No ES sink. just print() : 1,510msg/sec
> 3) JSON to TSV : 8,000msg/sec
> 
> So, as you can see, the bottleneck was JSON parsing. I also want to
> try eliminating Kafka to see
> if there is a room to improve performance.(Currently, I am using
> FlinkKafkaConsumer082 with Kafka 0.9
> I think I should try Flink 1.0 and FlinkKafkaConsumer09).
> Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
> Flink's scalability and fault tolerance.
> Thank you for your advice.
> 
> Regards,
> Hironori Ogibayashi
> 
> 2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <og...@gmail.com>:
>> Stephan,
>> 
>> Thank you for your quick response.
>> I will try and post the result later.
>> 
>> Regards,
>> Hironori
>> 
>> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>>> Hi!
>>> 
>>> I would try and dig bit by bit into what the bottleneck is:
>>> 
>>> 1) Disable the checkpointing, see what difference that makes
>>> 2) Use a dummy sink (discarding) rather than elastic search, to see if that
>>> is limiting
>>> 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>>> easily dominate the entire pipeline.
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> 
>>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <og...@gmail.com> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> I started evaluating Flink and tried simple performance test.
>>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>>>> think this is quite low and wondering if it is a reasonable result.
>>>> If someone could check it, it would be great.
>>>> 
>>>> Here is the detail:
>>>> 
>>>> [servers]
>>>> - 3 Kafka broker with 3 partitions
>>>> - 3 Flink TaskManager + 1 JobManager
>>>> - 1 Elasticsearch
>>>> All of them are separate VM with 8vCPU, 8GB memory
>>>> 
>>>> [test case]
>>>> The application counts access log by URI with in 1 minute window and
>>>> send the result to Elasticsearch. The actual code is below.
>>>> I used '-p 3' option to flink run command, so the task was distributed
>>>> to 3 TaskManagers.
>>>> In the test, I sent about 5000 logs/sec to Kafka.
>>>> 
>>>> [result]
>>>> - From Elasticsearch records, the total access count for all URI was
>>>> about 260,000/min = 4300/sec. This is the entire throughput.
>>>> - Kafka consumer lag was keep growing.
>>>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>>>> command output, Flink java process was using 100%(1 CPU full)
>>>> 
>>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>>> 
>>>> Here is the application code.
>>>> ---
>>>>    val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>    env.enableCheckpointing(1000)
>>>> ...
>>>>    val stream = env
>>>>      .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>>>> SimpleStringSchema(), properties))
>>>>      .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>>>> AnyRef]] }
>>>>      .map{ x => x.get("uri") match {
>>>>        case Some(y) => (y.asInstanceOf[String],1)
>>>>        case None => ("", 1)
>>>>      }}
>>>>      .keyBy(0)
>>>>      .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>>>      .sum(1)
>>>>      .map{ x => (System.currentTimeMillis(), x)}
>>>>      .addSink(new ElasticsearchSink(config, transports, new
>>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>>>        override def createIndexRequest(element: Tuple2[Long,
>>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>>>          val json = new HashMap[String, AnyRef]
>>>>          json.put("@timestamp", new Timestamp(element._1))
>>>>          json.put("uri", element._2._1)
>>>>          json.put("count", element._2._2: java.lang.Integer)
>>>>          println("SENDING: " + element)
>>>> 
>>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>>>        }
>>>>      }))
>>>> ---
>>>> 
>>>> Regards,
>>>> Hironori Ogibayashi
>>> 
>>> 


Re: Flink streaming throughput

Posted by おぎばやしひろのり <og...@gmail.com>.
Stephan,

Sorry for the delay in my response.
I tried 3 cases you suggested.

This time, I set parallelism to 1 for simpicity.

0) base performance (same as the first e-mail): 1,480msg/sec
1) Disable checkpointing : almost same as 0)
2) No ES sink. just print() : 1,510msg/sec
3) JSON to TSV : 8,000msg/sec

So, as you can see, the bottleneck was JSON parsing. I also want to
try eliminating Kafka to see
if there is a room to improve performance.(Currently, I am using
FlinkKafkaConsumer082 with Kafka 0.9
I think I should try Flink 1.0 and FlinkKafkaConsumer09).
Anyway, I think 8,000msg/sec with 1 CPU is not so bad thinking of
Flink's scalability and fault tolerance.
Thank you for your advice.

Regards,
Hironori Ogibayashi

2016-02-26 21:46 GMT+09:00 おぎばやしひろのり <og...@gmail.com>:
> Stephan,
>
> Thank you for your quick response.
> I will try and post the result later.
>
> Regards,
> Hironori
>
> 2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
>> Hi!
>>
>> I would try and dig bit by bit into what the bottleneck is:
>>
>>  1) Disable the checkpointing, see what difference that makes
>>  2) Use a dummy sink (discarding) rather than elastic search, to see if that
>> is limiting
>>  3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
>> easily dominate the entire pipeline.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <og...@gmail.com> wrote:
>>>
>>> Hello,
>>>
>>> I started evaluating Flink and tried simple performance test.
>>> The result was just about 4000 messages/sec with 300% CPU usage. I
>>> think this is quite low and wondering if it is a reasonable result.
>>> If someone could check it, it would be great.
>>>
>>> Here is the detail:
>>>
>>> [servers]
>>> - 3 Kafka broker with 3 partitions
>>> - 3 Flink TaskManager + 1 JobManager
>>> - 1 Elasticsearch
>>> All of them are separate VM with 8vCPU, 8GB memory
>>>
>>> [test case]
>>> The application counts access log by URI with in 1 minute window and
>>> send the result to Elasticsearch. The actual code is below.
>>> I used '-p 3' option to flink run command, so the task was distributed
>>> to 3 TaskManagers.
>>> In the test, I sent about 5000 logs/sec to Kafka.
>>>
>>> [result]
>>> - From Elasticsearch records, the total access count for all URI was
>>> about 260,000/min = 4300/sec. This is the entire throughput.
>>> - Kafka consumer lag was keep growing.
>>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>>> command output, Flink java process was using 100%(1 CPU full)
>>>
>>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>>
>>> Here is the application code.
>>> ---
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     env.enableCheckpointing(1000)
>>> ...
>>>     val stream = env
>>>       .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>>> SimpleStringSchema(), properties))
>>>       .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>>> AnyRef]] }
>>>       .map{ x => x.get("uri") match {
>>>         case Some(y) => (y.asInstanceOf[String],1)
>>>         case None => ("", 1)
>>>       }}
>>>       .keyBy(0)
>>>       .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>>       .sum(1)
>>>       .map{ x => (System.currentTimeMillis(), x)}
>>>       .addSink(new ElasticsearchSink(config, transports, new
>>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>>         override def createIndexRequest(element: Tuple2[Long,
>>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>>           val json = new HashMap[String, AnyRef]
>>>           json.put("@timestamp", new Timestamp(element._1))
>>>           json.put("uri", element._2._1)
>>>           json.put("count", element._2._2: java.lang.Integer)
>>>           println("SENDING: " + element)
>>>
>>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>>         }
>>>       }))
>>> ---
>>>
>>> Regards,
>>> Hironori Ogibayashi
>>
>>

Re: Flink streaming throughput

Posted by おぎばやしひろのり <og...@gmail.com>.
Stephan,

Thank you for your quick response.
I will try and post the result later.

Regards,
Hironori

2016-02-26 19:45 GMT+09:00 Stephan Ewen <se...@apache.org>:
> Hi!
>
> I would try and dig bit by bit into what the bottleneck is:
>
>  1) Disable the checkpointing, see what difference that makes
>  2) Use a dummy sink (discarding) rather than elastic search, to see if that
> is limiting
>  3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
> easily dominate the entire pipeline.
>
> Greetings,
> Stephan
>
>
> On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <og...@gmail.com> wrote:
>>
>> Hello,
>>
>> I started evaluating Flink and tried simple performance test.
>> The result was just about 4000 messages/sec with 300% CPU usage. I
>> think this is quite low and wondering if it is a reasonable result.
>> If someone could check it, it would be great.
>>
>> Here is the detail:
>>
>> [servers]
>> - 3 Kafka broker with 3 partitions
>> - 3 Flink TaskManager + 1 JobManager
>> - 1 Elasticsearch
>> All of them are separate VM with 8vCPU, 8GB memory
>>
>> [test case]
>> The application counts access log by URI with in 1 minute window and
>> send the result to Elasticsearch. The actual code is below.
>> I used '-p 3' option to flink run command, so the task was distributed
>> to 3 TaskManagers.
>> In the test, I sent about 5000 logs/sec to Kafka.
>>
>> [result]
>> - From Elasticsearch records, the total access count for all URI was
>> about 260,000/min = 4300/sec. This is the entire throughput.
>> - Kafka consumer lag was keep growing.
>> - The CPU usage of each TaskManager machine was about 13-14%. From top
>> command output, Flink java process was using 100%(1 CPU full)
>>
>> So I thought the bottleneck here was CPU used by Flink Tasks.
>>
>> Here is the application code.
>> ---
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     env.enableCheckpointing(1000)
>> ...
>>     val stream = env
>>       .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
>> SimpleStringSchema(), properties))
>>       .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
>> AnyRef]] }
>>       .map{ x => x.get("uri") match {
>>         case Some(y) => (y.asInstanceOf[String],1)
>>         case None => ("", 1)
>>       }}
>>       .keyBy(0)
>>       .timeWindow(Time.of(1, TimeUnit.MINUTES))
>>       .sum(1)
>>       .map{ x => (System.currentTimeMillis(), x)}
>>       .addSink(new ElasticsearchSink(config, transports, new
>> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>>         override def createIndexRequest(element: Tuple2[Long,
>> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>>           val json = new HashMap[String, AnyRef]
>>           json.put("@timestamp", new Timestamp(element._1))
>>           json.put("uri", element._2._1)
>>           json.put("count", element._2._2: java.lang.Integer)
>>           println("SENDING: " + element)
>>
>> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>>         }
>>       }))
>> ---
>>
>> Regards,
>> Hironori Ogibayashi
>
>

Re: Flink streaming throughput

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I would try and dig bit by bit into what the bottleneck is:

 1) Disable the checkpointing, see what difference that makes
 2) Use a dummy sink (discarding) rather than elastic search, to see if
that is limiting
 3) Check the JSON parsing. Many JSON libraries are very CPU intensive and
easily dominate the entire pipeline.

Greetings,
Stephan


On Fri, Feb 26, 2016 at 11:23 AM, おぎばやしひろのり <og...@gmail.com> wrote:

> Hello,
>
> I started evaluating Flink and tried simple performance test.
> The result was just about 4000 messages/sec with 300% CPU usage. I
> think this is quite low and wondering if it is a reasonable result.
> If someone could check it, it would be great.
>
> Here is the detail:
>
> [servers]
> - 3 Kafka broker with 3 partitions
> - 3 Flink TaskManager + 1 JobManager
> - 1 Elasticsearch
> All of them are separate VM with 8vCPU, 8GB memory
>
> [test case]
> The application counts access log by URI with in 1 minute window and
> send the result to Elasticsearch. The actual code is below.
> I used '-p 3' option to flink run command, so the task was distributed
> to 3 TaskManagers.
> In the test, I sent about 5000 logs/sec to Kafka.
>
> [result]
> - From Elasticsearch records, the total access count for all URI was
> about 260,000/min = 4300/sec. This is the entire throughput.
> - Kafka consumer lag was keep growing.
> - The CPU usage of each TaskManager machine was about 13-14%. From top
> command output, Flink java process was using 100%(1 CPU full)
>
> So I thought the bottleneck here was CPU used by Flink Tasks.
>
> Here is the application code.
> ---
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     env.enableCheckpointing(1000)
> ...
>     val stream = env
>       .addSource(new FlinkKafkaConsumer082[String]("kafka.dummy", new
> SimpleStringSchema(), properties))
>       .map{ json => JSON.parseFull(json).get.asInstanceOf[Map[String,
> AnyRef]] }
>       .map{ x => x.get("uri") match {
>         case Some(y) => (y.asInstanceOf[String],1)
>         case None => ("", 1)
>       }}
>       .keyBy(0)
>       .timeWindow(Time.of(1, TimeUnit.MINUTES))
>       .sum(1)
>       .map{ x => (System.currentTimeMillis(), x)}
>       .addSink(new ElasticsearchSink(config, transports, new
> IndexRequestBuilder[Tuple2[Long, Tuple2[String, Int]]]  {
>         override def createIndexRequest(element: Tuple2[Long,
> Tuple2[String, Int]], ctx: RuntimeContext): IndexRequest = {
>           val json = new HashMap[String, AnyRef]
>           json.put("@timestamp", new Timestamp(element._1))
>           json.put("uri", element._2._1)
>           json.put("count", element._2._2: java.lang.Integer)
>           println("SENDING: " + element)
>
> Requests.indexRequest.index("dummy2").`type`("my-type").source(json)
>         }
>       }))
> ---
>
> Regards,
> Hironori Ogibayashi
>