You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Harrison Xu <hx...@quora.com> on 2019/11/26 00:09:01 UTC

Flink 1.9.1 KafkaConnector missing data (1M+ records)

Hello,

We're seeing some strange behavior with flink's KafkaConnector010 (Kafka
0.10.1.1) arbitrarily skipping data.

*Context*
KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter
(S3) as sink with no intermediate operators. Recently, we noticed that
millions of Kafka records were missing for *one* topic partition (this job
is running for 100+ topic partitions, and such behavior was only observed
for one). This job is run on YARN, and hosts were healthy with no hardware
faults observed. No exceptions in jobmanager or taskmanager logs at this
time.

*How was this detected?*
As a sanity check, we dual-write Kafka metadata (offsets) to a separate
location in S3, and have monitoring to ensure that written offsets are
contiguous with no duplicates.
Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.

*(Condensed) Taskmanager logs*
2019-11-24 02:36:50,140 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252
with MPU ID 3XG...
2019-11-24 02:41:27,966 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253
with MPU ID 9MW...
2019-11-24 02:46:29,153 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254
with MPU ID 7AP...
2019-11-24 02:51:32,602 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255
with MPU ID xQU...

*2019-11-24 02:56:35,183 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256
with MPU ID pDL...*

*2019-11-24 03:01:26,059 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257
with MPU ID Itf...*
*2019-11-24 03:01:26,510 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263
with MPU ID e3l...*
2019-11-24 03:06:26,230 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264
with MPU ID 5z4...
2019-11-24 03:11:22,711 INFO
 org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265
with MPU ID NfP...

Two observations stand out from the above logs:
- Datetime *2019-11-24T01* and *2019-11-24T02* are entirely skipped,
resulting in millions of missing offsets. They are never written in future
commits (and data in S3 shows this).
- Two commits for the *same *topic partition ("digest_features", partition
4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit
interval being set at 5 minutes. Why was the same TopicPartition read from
and committed twice in such a short interval?

Would greatly appreciate if anyone is able to shed light on this issue.
Happy to provide full logs if needed.
Thanks

Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Harrison,

Really sorry for the late reply.
Do you have any insight on whether the missing records were read by
the consumer and just the StreamingFileSink failed to write their
offsets, or the Kafka consumer did not even read them or dropped them
for some reason? I asking this in order to narrow down the problem. In
addition, did you see anything out of the ordinary in the logs?

I am also cc'ing Becket who may know a bit more on the kafka consumer
side of things.

Cheers,
Kostas

On Mon, Dec 2, 2019 at 10:00 PM Harrison Xu <hx...@quora.com> wrote:
>
> Thank you for your reply,
>
> Some clarification:
>
> We have configured the BucketAssigner to use the Kafka record timestamp. Exact bucketing behavior as follows:
> private static final DateTimeFormatter formatter = DateTimeFormatter
> .ofPattern("yyyy-MM-dd'T'HH");
>
> @Override
> public String getBucketId(KafkaRecord record, BucketAssigner.Context context) {
> return String.format(
> "%s/dt=%s/partition_%s",
> record.getTopic(),
> Instant.ofEpochMilli(record.getTimestamp()).atZone(ZoneOffset.UTC).format(formatter),
> record.getPartition());
> }
>
> For each record, we write only its offset to the S3 object as a sanity check. It is easy to detect missing or duplicate offsets. To answer your questions:
>
> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
> are entirely skipped?
> No, because even if the producer were idle during these datetimes, we would expect no missing offsets. We observed both millions of missing records, in addition to missing partitions (2019-11-24T01 and 2019-11-24T02). Further, the producer was very active during this time.
> I want to emphasize that we noticed that the consumer for this exact TopicPartition was falling behind (>1 hour lag); this degree of lag was only observed for this partition. (The consumer eventually caught up). It's normal for the consumer to fall behind the producer for short bursts, but we definitely do not expect missing records as a result. There were millions of records whose timestamps fall into (dt 2019-11-24T01 and 2019-11-24T02) - they were entirely skipped by the writer.
>
>
> what does TT stand for?
> It's simply convention for datetime serialization as string.
>
>
> Can it be that there are a lot of events for partition 4 that fill up
> 2 part files for that duration?
> We are using the BulkWriter. I am under the impression that this writer should only produce one file per checkpoint interval, which we have configured to be 5 minutes. You see that the preceding commits follow this pattern of one commit per checkpoint interval, which is what we expect. It's very strange that two files for the same TopicPartition (same TaskManager) are committed.
>
>
> I am eager to hear your reply and understand what we're seeing.
>
> Thanks,
> Harrison
>
> On Thu, Nov 28, 2019 at 6:43 AM Kostas Kloudas <kk...@gmail.com> wrote:
>>
>> Hi Harrison,
>>
>> One thing to keep in mind is that Flink will only write files if there
>> is data to write. If, for example, your partition is not active for a
>> period of time, then no files will be written.
>> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
>> are entirely skipped?
>>
>> In addition, for the "duplicates", it would help if you could share a
>> bit more information about your BucketAssigner.
>> How are these names assigned to the files and what does TT stand for?
>> Can it be that there are a lot of events for partition 4 that fill up
>> 2 part files for that duration? I am
>> asking because the counter of the 2 part files differ.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <hx...@quora.com> wrote:
>> >
>> > Hello,
>> >
>> > We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 0.10.1.1) arbitrarily skipping data.
>> >
>> > Context
>> > KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate operators. Recently, we noticed that millions of Kafka records were missing for one topic partition (this job is running for 100+ topic partitions, and such behavior was only observed for one). This job is run on YARN, and hosts were healthy with no hardware faults observed. No exceptions in jobmanager or taskmanager logs at this time.
>> >
>> > How was this detected?
>> > As a sanity check, we dual-write Kafka metadata (offsets) to a separate location in S3, and have monitoring to ensure that written offsets are contiguous with no duplicates.
>> > Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>> >
>> > (Condensed) Taskmanager logs
>> > 2019-11-24 02:36:50,140 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with MPU ID 3XG...
>> > 2019-11-24 02:41:27,966 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with MPU ID 9MW...
>> > 2019-11-24 02:46:29,153 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with MPU ID 7AP...
>> > 2019-11-24 02:51:32,602 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with MPU ID xQU...
>> > 2019-11-24 02:56:35,183 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with MPU ID pDL...
>> > 2019-11-24 03:01:26,059 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with MPU ID Itf...
>> > 2019-11-24 03:01:26,510 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with MPU ID e3l...
>> > 2019-11-24 03:06:26,230 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with MPU ID 5z4...
>> > 2019-11-24 03:11:22,711 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with MPU ID NfP...
>> >
>> > Two observations stand out from the above logs:
>> > - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in millions of missing offsets. They are never written in future commits (and data in S3 shows this).
>> > - Two commits for the same topic partition ("digest_features", partition 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit interval being set at 5 minutes. Why was the same TopicPartition read from and committed twice in such a short interval?
>> >
>> > Would greatly appreciate if anyone is able to shed light on this issue. Happy to provide full logs if needed.
>> > Thanks
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >

Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

Posted by Harrison Xu <hx...@quora.com>.
Thank you for your reply,

Some clarification:

We have configured the BucketAssigner to use the *Kafka record timestamp*.
Exact bucketing behavior as follows:
private static final DateTimeFormatter formatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH");

@Override
public String getBucketId(KafkaRecord record, BucketAssigner.Context context)
{
return String.format(
"%s/dt=%s/partition_%s",
record.getTopic(),
Instant.ofEpochMilli(record.getTimestamp()).atZone(ZoneOffset.UTC).format
(formatter),
record.getPartition());
}

For each record, we write only its offset to the S3 object as a sanity
check. It is easy to detect missing or duplicate offsets. To answer your
questions:


*Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02are
entirely skipped?*
No, because even if the producer were idle during these datetimes, we would
expect no missing offsets. We observed both *millions of missing records*,
in addition to missing partitions (2019-11-24T01 and 2019-11-24T02).
Further, the producer was very active during this time.
I want to emphasize that we noticed that the consumer for this exact
TopicPartition was falling behind (>1 hour lag); this degree of lag was
only observed for this partition. (The consumer eventually caught up). It's
normal for the consumer to fall behind the producer for short bursts, but
we definitely do not expect missing records as a result. There were
millions of records whose timestamps fall into (dt 2019-11-24T01 and
2019-11-24T02) - they were entirely skipped by the writer.


*what does TT stand for?*
It's simply convention for datetime serialization as string.



*Can it be that there are a lot of events for partition 4 that fill up2
part files for that duration?*
We are using the BulkWriter. I am under the impression that this writer
should only produce one file per checkpoint interval, which we have
configured to be 5 minutes. You see that the preceding commits follow this
pattern of one commit per checkpoint interval, which is what we expect.
It's very strange that two files for the same TopicPartition (same
TaskManager) are committed.


I am eager to hear your reply and understand what we're seeing.

Thanks,
Harrison

On Thu, Nov 28, 2019 at 6:43 AM Kostas Kloudas <kk...@gmail.com> wrote:

> Hi Harrison,
>
> One thing to keep in mind is that Flink will only write files if there
> is data to write. If, for example, your partition is not active for a
> period of time, then no files will be written.
> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
> are entirely skipped?
>
> In addition, for the "duplicates", it would help if you could share a
> bit more information about your BucketAssigner.
> How are these names assigned to the files and what does TT stand for?
> Can it be that there are a lot of events for partition 4 that fill up
> 2 part files for that duration? I am
> asking because the counter of the 2 part files differ.
>
> Cheers,
> Kostas
>
> On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <hx...@quora.com> wrote:
> >
> > Hello,
> >
> > We're seeing some strange behavior with flink's KafkaConnector010 (Kafka
> 0.10.1.1) arbitrarily skipping data.
> >
> > Context
> > KafkaConnector010 is used as source, and
> StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate
> operators. Recently, we noticed that millions of Kafka records were missing
> for one topic partition (this job is running for 100+ topic partitions, and
> such behavior was only observed for one). This job is run on YARN, and
> hosts were healthy with no hardware faults observed. No exceptions in
> jobmanager or taskmanager logs at this time.
> >
> > How was this detected?
> > As a sanity check, we dual-write Kafka metadata (offsets) to a separate
> location in S3, and have monitoring to ensure that written offsets are
> contiguous with no duplicates.
> > Each Kafka record is bucketed into hourly datetime partitions (UTC) in
> S3.
> >
> > (Condensed) Taskmanager logs
> > 2019-11-24 02:36:50,140 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252
> with MPU ID 3XG...
> > 2019-11-24 02:41:27,966 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253
> with MPU ID 9MW...
> > 2019-11-24 02:46:29,153 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254
> with MPU ID 7AP...
> > 2019-11-24 02:51:32,602 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255
> with MPU ID xQU...
> > 2019-11-24 02:56:35,183 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256
> with MPU ID pDL...
> > 2019-11-24 03:01:26,059 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257
> with MPU ID Itf...
> > 2019-11-24 03:01:26,510 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263
> with MPU ID e3l...
> > 2019-11-24 03:06:26,230 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264
> with MPU ID 5z4...
> > 2019-11-24 03:11:22,711 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer              - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265
> with MPU ID NfP...
> >
> > Two observations stand out from the above logs:
> > - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped,
> resulting in millions of missing offsets. They are never written in future
> commits (and data in S3 shows this).
> > - Two commits for the same topic partition ("digest_features", partition
> 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit
> interval being set at 5 minutes. Why was the same TopicPartition read from
> and committed twice in such a short interval?
> >
> > Would greatly appreciate if anyone is able to shed light on this issue.
> Happy to provide full logs if needed.
> > Thanks
> >
> >
> >
> >
> >
> >
> >
> >
>

Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

Posted by Kostas Kloudas <kk...@gmail.com>.
Hi Harrison,

One thing to keep in mind is that Flink will only write files if there
is data to write. If, for example, your partition is not active for a
period of time, then no files will be written.
Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
are entirely skipped?

In addition, for the "duplicates", it would help if you could share a
bit more information about your BucketAssigner.
How are these names assigned to the files and what does TT stand for?
Can it be that there are a lot of events for partition 4 that fill up
2 part files for that duration? I am
asking because the counter of the 2 part files differ.

Cheers,
Kostas

On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu <hx...@quora.com> wrote:
>
> Hello,
>
> We're seeing some strange behavior with flink's KafkaConnector010 (Kafka 0.10.1.1) arbitrarily skipping data.
>
> Context
> KafkaConnector010 is used as source, and StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate operators. Recently, we noticed that millions of Kafka records were missing for one topic partition (this job is running for 100+ topic partitions, and such behavior was only observed for one). This job is run on YARN, and hosts were healthy with no hardware faults observed. No exceptions in jobmanager or taskmanager logs at this time.
>
> How was this detected?
> As a sanity check, we dual-write Kafka metadata (offsets) to a separate location in S3, and have monitoring to ensure that written offsets are contiguous with no duplicates.
> Each Kafka record is bucketed into hourly datetime partitions (UTC) in S3.
>
> (Condensed) Taskmanager logs
> 2019-11-24 02:36:50,140 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252 with MPU ID 3XG...
> 2019-11-24 02:41:27,966 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253 with MPU ID 9MW...
> 2019-11-24 02:46:29,153 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254 with MPU ID 7AP...
> 2019-11-24 02:51:32,602 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255 with MPU ID xQU...
> 2019-11-24 02:56:35,183 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256 with MPU ID pDL...
> 2019-11-24 03:01:26,059 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5257 with MPU ID Itf...
> 2019-11-24 03:01:26,510 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5263 with MPU ID e3l...
> 2019-11-24 03:06:26,230 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5264 with MPU ID 5z4...
> 2019-11-24 03:11:22,711 INFO  org.apache.flink.fs.s3.common.writer.S3Committer              - Committing kafka_V2/meta/digest_features/dt=2019-11-24T03/partition_4/part-12-5265 with MPU ID NfP...
>
> Two observations stand out from the above logs:
> - Datetime 2019-11-24T01 and 2019-11-24T02 are entirely skipped, resulting in millions of missing offsets. They are never written in future commits (and data in S3 shows this).
> - Two commits for the same topic partition ("digest_features", partition 4), happened nearly simultaneously on 2019-11-24 03:03, despite our commit interval being set at 5 minutes. Why was the same TopicPartition read from and committed twice in such a short interval?
>
> Would greatly appreciate if anyone is able to shed light on this issue. Happy to provide full logs if needed.
> Thanks
>
>
>
>
>
>
>
>