You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@gobblin.apache.org by Tamás Németh <tr...@apache.org> on 2020/06/17 18:35:14 UTC

Compaction optimization

Hi All,

I thought let's shake up a bit the mailing list and I would be interested
in your thoughts.

We use Gobblin for ingesting product analytics events from Kafka and doing
compaction/hive registration.
We use it around 2.5 years from now and it works/ed well but as our
ingestion pipeline got popular (thank god :)) we also started to have
issues because of the scale.
Nowadays we have around 1200 Kafka topics for one of our product's event
(we have one topic per event type) that we pull to S3 in Avro format and we
do daily compaction for these events (we do ingestion every 15 minutes and
this is the source of daily compaction). Nowadays the compaction got pretty
slow.
These events are relatively small topics but there are a few which are
super busy (around 15-20 from 1200).
So far we had only one compaction job which went through all of the event
types and had one reducer setting which was set to the
max(reduce_memory_usage) of all the spawned compaction mapreduce job which
I think is not optimal as this will be used for all the jobs.

What we just trying out is to create a separate compaction job from busy
topics (putting all the busy topics into whitelist for the compaction) and
set reducer memory to high and another one for the others (putting into
blacklist all the big topics) with small reducer memory setting.

Do you have an idea how the compaction can speed up or I would curious how
you use compaction in your environment if you use?

Thanks,
Tamas

Re: Compaction optimization

Posted by Lei Sun <le...@linkedin.com>.
Hi Tamas,


Many factors play role in the time spent on preparing the MR Jobs, usually it is caused by complex pattern passed to the datasetFinder that makes listing of the storage slow. Is your storage layer S3 ? I assume list API request made to S3 are costly.  Do you have profiling on that ?


Regards,
Lei




________________________________
From: Tamás Németh <tr...@gmail.com>
Sent: Friday, June 19, 2020 4:53 AM
To: user@gobblin.incubator.apache.org <us...@gobblin.incubator.apache.org>
Subject: Re: Compaction optimization

Hey Lei,

Thanks for the answer and it is good to know you are in a similar situation. :)
We generate an ulid unique id for every event and this is our compaction.dedup.key as well.
Now I set up Dr. Elephant and tried to tuned the mapper and reducer memory to be more optimal and I can see improvements.

Another thing what I noticed the compaction job runtimes looked like this today
TimeBasedSubDirDatasetsFinder scan for data to compact -> 10 minutes
Creating compaction jobs  for 534 dataset -> 17 minutes
The MapReduce job runs for 534 dataset -> 1 hour and 19 minutes

Is it normal that 1/4 of the compaction run is checking if the dataset needs to be compacted and submitting MapReduce job?

Thanks,
Tamas

On Wed, 17 Jun 2020 at 21:41, Lei Sun <le...@linkedin.com>> wrote:
Hi Tamas,

Thanks for raising this issue. We have similar situation on our side, and I have some thoughts around this. I am working on shrinking the shuffleKey size in ORC-compaction and it achieved roughly 3x throughput gain. I would like to do similar thing for Avro as well. We could shrink the shuffleKey size because we have common "ID" column existed in all kafka events's schema, plus we do a dedup on the reducer side as well, in case two events collide on the ID we used.

A question from me: In your kafka topics' schema, is there such thing as "ID" column ? If your kafka schema has primary key that would be perfect.

The other thing which is larger scope is to use Spark as the execution engine. I am happy to discuss and collaborate if you would like to pursue this route.


Regards,
Lei
________________________________
From: Tamás Németh <tr...@apache.org>>
Sent: Wednesday, June 17, 2020 11:35 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org> <us...@gobblin.incubator.apache.org>>
Subject: Compaction optimization

Hi All,

I thought let's shake up a bit the mailing list and I would be interested in your thoughts.

We use Gobblin for ingesting product analytics events from Kafka and doing compaction/hive registration.
We use it around 2.5 years from now and it works/ed well but as our ingestion pipeline got popular (thank god :)) we also started to have issues because of the scale.
Nowadays we have around 1200 Kafka topics for one of our product's event  (we have one topic per event type) that we pull to S3 in Avro format and we do daily compaction for these events (we do ingestion every 15 minutes and this is the source of daily compaction). Nowadays the compaction got pretty slow.
These events are relatively small topics but there are a few which are super busy (around 15-20 from 1200).
So far we had only one compaction job which went through all of the event types and had one reducer setting which was set to the max(reduce_memory_usage) of all the spawned compaction mapreduce job which I think is not optimal as this will be used for all the jobs.

What we just trying out is to create a separate compaction job from busy topics (putting all the busy topics into whitelist for the compaction) and set reducer memory to high and another one for the others (putting into blacklist all the big topics) with small reducer memory setting.

Do you have an idea how the compaction can speed up or I would curious how you use compaction in your environment if you use?

Thanks,
Tamas

Re: Compaction optimization

Posted by Tamás Németh <tr...@gmail.com>.
Hey Lei,

Thanks for the answer and it is good to know you are in a similar
situation. :)
We generate an ulid unique id for every event and this is our
compaction.dedup.key as well.
Now I set up Dr. Elephant and tried to tuned the mapper and reducer memory
to be more optimal and I can see improvements.

Another thing what I noticed the compaction job runtimes looked like this
today
TimeBasedSubDirDatasetsFinder scan for data to compact -> 10 minutes
Creating compaction jobs  for 534 dataset -> 17 minutes
The MapReduce job runs for 534 dataset -> 1 hour and 19 minutes

Is it normal that 1/4 of the compaction run is checking if the dataset
needs to be compacted and submitting MapReduce job?

Thanks,
Tamas

On Wed, 17 Jun 2020 at 21:41, Lei Sun <le...@linkedin.com> wrote:

> Hi Tamas,
>
> Thanks for raising this issue. We have similar situation on our side, and
> I have some thoughts around this. I am working on shrinking the shuffleKey
> size in ORC-compaction and it achieved roughly 3x throughput gain. I would
> like to do similar thing for Avro as well. We could shrink the shuffleKey
> size because we have common "ID" column existed in all kafka events's
> schema, plus we do a dedup on the reducer side as well, in case two events
> collide on the ID we used.
>
> A question from me: In your kafka topics' schema, is there such thing as
> "ID" column ? If your kafka schema has primary key that would be perfect.
>
> The other thing which is larger scope is to use Spark as the execution
> engine. I am happy to discuss and collaborate if you would like to pursue
> this route.
>
>
> Regards,
> Lei
> ------------------------------
> *From:* Tamás Németh <tr...@apache.org>
> *Sent:* Wednesday, June 17, 2020 11:35 AM
> *To:* user@gobblin.incubator.apache.org <user@gobblin.incubator.apache.org
> >
> *Subject:* Compaction optimization
>
> Hi All,
>
> I thought let's shake up a bit the mailing list and I would be interested
> in your thoughts.
>
> We use Gobblin for ingesting product analytics events from Kafka and doing
> compaction/hive registration.
> We use it around 2.5 years from now and it works/ed well but as our
> ingestion pipeline got popular (thank god :)) we also started to have
> issues because of the scale.
> Nowadays we have around 1200 Kafka topics for one of our product's event
> (we have one topic per event type) that we pull to S3 in Avro format and we
> do daily compaction for these events (we do ingestion every 15 minutes and
> this is the source of daily compaction). Nowadays the compaction got pretty
> slow.
> These events are relatively small topics but there are a few which are
> super busy (around 15-20 from 1200).
> So far we had only one compaction job which went through all of the event
> types and had one reducer setting which was set to the
> max(reduce_memory_usage) of all the spawned compaction mapreduce job which
> I think is not optimal as this will be used for all the jobs.
>
> What we just trying out is to create a separate compaction job from busy
> topics (putting all the busy topics into whitelist for the compaction) and
> set reducer memory to high and another one for the others (putting into
> blacklist all the big topics) with small reducer memory setting.
>
> Do you have an idea how the compaction can speed up or I would curious how
> you use compaction in your environment if you use?
>
> Thanks,
> Tamas
>

Re: Compaction optimization

Posted by Lei Sun <le...@linkedin.com>.
Hi Tamas,

Thanks for raising this issue. We have similar situation on our side, and I have some thoughts around this. I am working on shrinking the shuffleKey size in ORC-compaction and it achieved roughly 3x throughput gain. I would like to do similar thing for Avro as well. We could shrink the shuffleKey size because we have common "ID" column existed in all kafka events's schema, plus we do a dedup on the reducer side as well, in case two events collide on the ID we used.

A question from me: In your kafka topics' schema, is there such thing as "ID" column ? If your kafka schema has primary key that would be perfect.

The other thing which is larger scope is to use Spark as the execution engine. I am happy to discuss and collaborate if you would like to pursue this route.


Regards,
Lei
________________________________
From: Tam?s N?meth <tr...@apache.org>
Sent: Wednesday, June 17, 2020 11:35 AM
To: user@gobblin.incubator.apache.org <us...@gobblin.incubator.apache.org>
Subject: Compaction optimization

Hi All,

I thought let's shake up a bit the mailing list and I would be interested in your thoughts.

We use Gobblin for ingesting product analytics events from Kafka and doing compaction/hive registration.
We use it around 2.5 years from now and it works/ed well but as our ingestion pipeline got popular (thank god :)) we also started to have issues because of the scale.
Nowadays we have around 1200 Kafka topics for one of our product's event  (we have one topic per event type) that we pull to S3 in Avro format and we do daily compaction for these events (we do ingestion every 15 minutes and this is the source of daily compaction). Nowadays the compaction got pretty slow.
These events are relatively small topics but there are a few which are super busy (around 15-20 from 1200).
So far we had only one compaction job which went through all of the event types and had one reducer setting which was set to the max(reduce_memory_usage) of all the spawned compaction mapreduce job which I think is not optimal as this will be used for all the jobs.

What we just trying out is to create a separate compaction job from busy topics (putting all the busy topics into whitelist for the compaction) and set reducer memory to high and another one for the others (putting into blacklist all the big topics) with small reducer memory setting.

Do you have an idea how the compaction can speed up or I would curious how you use compaction in your environment if you use?

Thanks,
Tamas