You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Gabriel Pelielo <ga...@hotmail.com> on 2018/05/02 21:20:12 UTC

Flink consuming more memory than expected

We use Flink to process transactional events. A job was created to aggregate information about the clients, day of week and hour of day and thus creating a profile as shown in the attached code.


val stream = env.addSource(consumer)
val result = stream
  .map(openTransaction => {
    val transactionDate = openTransaction.get("transactionDate")
    val date = if (transactionDate.isTextual)
      LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
    else
      transactionDate.asLong
    (openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
  })
  .keyBy(0)
  .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
  .sum(1)

In the code above, the stream has three fields: "transactionDate", "clientId" and "amount". We make a keyed stream by the clientId and a sliding window summing the amount. There are around 100.000 unique active clientIds in our database.

After some time running, the total RAM used by the job is stabilized at 36 GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to reduce the RAM usage of the job, maybe by configuring Flink's replication factor or by using RocksDB?


Best regards






Re: Flink consuming more memory than expected

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

There are a few JIRA tickets that address this problem [1] [2].

Summary: The best execution strategy depends on the amount of data / window
configuration.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7001
[2] https://issues.apache.org/jira/browse/FLINK-5387

2018-05-04 7:22 GMT+02:00 Rong Rong <wa...@gmail.com>:

> Agree with Bowen on this note: you should probably use some more efficient
> way of handling the data in sliding window, since data will be "assigned"
> to each sliding window through a window assigner and thus costs extra
> memory usage.
>
> BTW: since we are on this topic, I was wondering if there's any way of
> improving the memory efficiency in dealing elements that belongs to
> overlapping windows.
>
> --
> Rong
>
> On Thu, May 3, 2018 at 9:40 PM, Bowen Li <bo...@gmail.com> wrote:
>
> > Hi Gabriel,
> >
> > Yes, using RocksDB state backend can relieve your RAM usage. I see a few
> > issues with your job: 1) it's keeping track of 672 windows (28x24),
> that's
> > lots of data, so try to reduce number of windows 2) use reduce functions
> to
> > incrementally aggregate state, rather than buffering data internally
> >
> > BTW, this kind of questions should be posted to *user@flink alias*
> rather
> > than dev@flink.
> >
> > Bowen
> >
> >
> >
> > On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo <
> > gabrielpelielo@hotmail.com>
> > wrote:
> >
> > > We use Flink to process transactional events. A job was created to
> > > aggregate information about the clients, day of week and hour of day
> and
> > > thus creating a profile as shown in the attached code.
> > >
> > >
> > > val stream = env.addSource(consumer)
> > > val result = stream
> > >   .map(openTransaction => {
> > >     val transactionDate = openTransaction.get("transactionDate")
> > >     val date = if (transactionDate.isTextual)
> > >       LocalDateTime.parse(transactionDate.asText,
> > > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.
> UTC).toEpochMilli
> > >     else
> > >       transactionDate.asLong
> > >     (openTransaction.get("clientId").asLong,
> > > openTransaction.get("amount").asDouble, new Timestamp(date))
> > >   })
> > >   .keyBy(0)
> > >   .window(SlidingEventWeekTimeWindows.of(Time.days(28),
> Time.hours(1)))
> > >   .sum(1)
> > >
> > > In the code above, the stream has three fields: "transactionDate",
> > > "clientId" and "amount". We make a keyed stream by the clientId and a
> > > sliding window summing the amount. There are around 100.000 unique
> active
> > > clientIds in our database.
> > >
> > > After some time running, the total RAM used by the job is stabilized at
> > 36
> > > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to
> > > reduce the RAM usage of the job, maybe by configuring Flink's
> replication
> > > factor or by using RocksDB?
> > >
> > >
> > > Best regards
> > >
> > >
> > >
> > >
> > >
> > >
> >
>

Re: Flink consuming more memory than expected

Posted by Rong Rong <wa...@gmail.com>.
Agree with Bowen on this note: you should probably use some more efficient
way of handling the data in sliding window, since data will be "assigned"
to each sliding window through a window assigner and thus costs extra
memory usage.

BTW: since we are on this topic, I was wondering if there's any way of
improving the memory efficiency in dealing elements that belongs to
overlapping windows.

--
Rong

On Thu, May 3, 2018 at 9:40 PM, Bowen Li <bo...@gmail.com> wrote:

> Hi Gabriel,
>
> Yes, using RocksDB state backend can relieve your RAM usage. I see a few
> issues with your job: 1) it's keeping track of 672 windows (28x24), that's
> lots of data, so try to reduce number of windows 2) use reduce functions to
> incrementally aggregate state, rather than buffering data internally
>
> BTW, this kind of questions should be posted to *user@flink alias* rather
> than dev@flink.
>
> Bowen
>
>
>
> On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo <
> gabrielpelielo@hotmail.com>
> wrote:
>
> > We use Flink to process transactional events. A job was created to
> > aggregate information about the clients, day of week and hour of day and
> > thus creating a profile as shown in the attached code.
> >
> >
> > val stream = env.addSource(consumer)
> > val result = stream
> >   .map(openTransaction => {
> >     val transactionDate = openTransaction.get("transactionDate")
> >     val date = if (transactionDate.isTextual)
> >       LocalDateTime.parse(transactionDate.asText,
> > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
> >     else
> >       transactionDate.asLong
> >     (openTransaction.get("clientId").asLong,
> > openTransaction.get("amount").asDouble, new Timestamp(date))
> >   })
> >   .keyBy(0)
> >   .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
> >   .sum(1)
> >
> > In the code above, the stream has three fields: "transactionDate",
> > "clientId" and "amount". We make a keyed stream by the clientId and a
> > sliding window summing the amount. There are around 100.000 unique active
> > clientIds in our database.
> >
> > After some time running, the total RAM used by the job is stabilized at
> 36
> > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to
> > reduce the RAM usage of the job, maybe by configuring Flink's replication
> > factor or by using RocksDB?
> >
> >
> > Best regards
> >
> >
> >
> >
> >
> >
>

Re: Flink consuming more memory than expected

Posted by Bowen Li <bo...@gmail.com>.
Hi Gabriel,

Yes, using RocksDB state backend can relieve your RAM usage. I see a few
issues with your job: 1) it's keeping track of 672 windows (28x24), that's
lots of data, so try to reduce number of windows 2) use reduce functions to
incrementally aggregate state, rather than buffering data internally

BTW, this kind of questions should be posted to *user@flink alias* rather
than dev@flink.

Bowen



On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo <ga...@hotmail.com>
wrote:

> We use Flink to process transactional events. A job was created to
> aggregate information about the clients, day of week and hour of day and
> thus creating a profile as shown in the attached code.
>
>
> val stream = env.addSource(consumer)
> val result = stream
>   .map(openTransaction => {
>     val transactionDate = openTransaction.get("transactionDate")
>     val date = if (transactionDate.isTextual)
>       LocalDateTime.parse(transactionDate.asText,
> DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
>     else
>       transactionDate.asLong
>     (openTransaction.get("clientId").asLong,
> openTransaction.get("amount").asDouble, new Timestamp(date))
>   })
>   .keyBy(0)
>   .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
>   .sum(1)
>
> In the code above, the stream has three fields: "transactionDate",
> "clientId" and "amount". We make a keyed stream by the clientId and a
> sliding window summing the amount. There are around 100.000 unique active
> clientIds in our database.
>
> After some time running, the total RAM used by the job is stabilized at 36
> GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to
> reduce the RAM usage of the job, maybe by configuring Flink's replication
> factor or by using RocksDB?
>
>
> Best regards
>
>
>
>
>
>