You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Groesbeck <pe...@gmail.com> on 2019/05/02 21:10:52 UTC

DateTimeBucketAssigner using Element Timestamp

Hi all,

I have an application that reads from various Kafka topics and writes
parquet files to corresponding buckets on S3 using StreamingFileSink with
DateTimeBucketAssigner. The upstream application that writes to Kafka also
writes records as gzipped json files to date bucketed locations on S3 as
backup.

One requirement we have is to back fill missing data in the event that the
application or Kafka experiences an outage. This can be accomplished by
reading the backup files that were written to S3 by our upstream
application instead of reading from Kafka. My current approach is to read
the hourly backup buckets, transform the files into a DataStream and assign
them a timestamp based on a datetime field on the json records using
BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the
DataStream to the same StreamingFileSink which ideally would write past
records in the same manner as if they had been streamed by Kafka.

Unfortunately for me, the bucket assigner works on system time:

A BucketAssigner
<https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html>
that
assigns to buckets based on current system time.

@Override
public String getBucketId(IN element, BucketAssigner.Context context) {
   if (dateTimeFormatter == null) {
      dateTimeFormatter =
DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
   }
   return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
}


No problem, I can extend DateTimeBucketAssigner and override the
method to grab elementTimestamp instead of currentProccessingTime, but
I'm wondering if this is the right approach? And if so would this
behavior be useful outside of the context of my application?

Thanks in advance for your help and for this awesome framework!

Peter

Re: DateTimeBucketAssigner using Element Timestamp

Posted by Rafi Aroch <ra...@gmail.com>.
Hi Peter,

I also encountered this issue. As far as I know, it is not currently
possible to stream from files (or any bounded stream) into a
*StreamingFileSink*.
This is because files are rolled over only on checkpoints and NOT when the
stream closes. This is due to the fact that at the function level, Flink
does not differentiate between failures and normal termination.
There's an issue (though not active for a while) to fix it [1].

Are you sure checkpoints did happen? Can you verify the logs? Verify the
checkpoint interval is shorter than your job execution time. If you do see
checkpoints happening, that's another issue. Could be a bug.

A workaround I got back then from Kostas Kloudas was:

*"Although not the most elegant, but one solution could be to write your
program using the file *
*source in PROCESS_CONTINUOUSLY mode, as described here *
*https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html>
*
*and when you are sure that the processing of your file is done, then you
cancel the job."*

Honestly, I did not try that.

There is also an ongoing effort to support bounded streams in DataStream
API [2], which might provide the backbone for the functionalists that you
need.

Another alternative for you could be to use the 'older' *BucketingSink*
instead of *StreamingFileSink*. It would work, but depends on your Sink
file system (like S3), may not guaranty exactly-once.

Hope this helps.

Rafi

[1] https://issues.apache.org/jira/browse/FLINK-2646
[2] https://issues.apache.org/jira/browse/FLINK-11875


On Fri, May 3, 2019 at 11:21 PM Peter Groesbeck <pe...@gmail.com>
wrote:

> Thanks for the quick response Piotr,
>
> I feel like I have everything working but no files are getting written to
> disk. I've implemented my own BucketAssigner like so:
>
> class BackFillBucketAssigner[IN] extends BucketAssigner[IN, String] {
>   override def getBucketId(element: IN, context: BucketAssigner.Context): String = {
>     DateTimeFormatter.ofPattern(formatString).withZone(ZoneId.systemDefault).format(Instant.ofEpochMilli(context.timestamp()))
>   }
>
>   override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE
>
> }
>
> And plugged it into my sink:
>
>   val parquet = StreamingFileSink
>     .forBulkFormat(path, ParquetAvroWriters.forGenericRecord(ReflectData.get().getSchema(clazz)))
>     .withBucketAssigner(new BackFillBucketAssigner[GenericRecord])
>     .build
>
> stream.addSink(parquet)
>
> When I run locally I can see the temporary part files but nothing ever gets rolled. I saw this once before when I didn't have checkpointing enabled for my original streaming job and this note tipped me off:
>
> IMPORTANT: Bulk-encoding formats can only be combined with the `OnCheckpointRollingPolicy`, which rolls the in-progress part file on every checkpoint.
>
> Is it possible that something similar is happening? I have enabled checkpointing in the job however since it is reading from flat files and assigning a timestamp, is it possible checkpointing not working as I expect? Nothing in my logs seems to suggest an error and the job runs to completion (about 30 minutes).
>
> Thanks again for your help!
> Peter
>
>
> On Fri, May 3, 2019 at 4:46 AM Piotr Nowojski <pi...@ververica.com> wrote:
>
>> Hi Peter,
>>
>> It sounds like this should work, however my question would be do you want
>> exactly-once processing? If yes, then you would have to somehow know which
>> exact events needs re-processing or deduplicate them somehow. Keep in mind
>> that in case of an outage in the original job, you probably will have some
>> files already committed by the StreamingFileSink.
>>
>> Another approach might be to somehow overwrite the previous files (but
>> then you would have to check whether the bucket assignment and file naming
>> is completely deterministic) or before reprocessing from backup remove the
>> dirty files from the crashed job.
>>
>> Piotrek
>>
>> On 2 May 2019, at 23:10, Peter Groesbeck <pe...@gmail.com>
>> wrote:
>>
>> Hi all,
>>
>> I have an application that reads from various Kafka topics and writes
>> parquet files to corresponding buckets on S3 using StreamingFileSink with
>>  DateTimeBucketAssigner. The upstream application that writes to Kafka
>> also writes records as gzipped json files to date bucketed locations on S3
>> as backup.
>>
>> One requirement we have is to back fill missing data in the event that
>> the application or Kafka experiences an outage. This can be accomplished by
>> reading the backup files that were written to S3 by our upstream
>> application instead of reading from Kafka. My current approach is to read
>> the hourly backup buckets, transform the files into a DataStream and
>> assign them a timestamp based on a datetime field on the json records using
>> BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect
>> the DataStream to the same StreamingFileSink which ideally would write
>> past records in the same manner as if they had been streamed by Kafka.
>>
>> Unfortunately for me, the bucket assigner works on system time:
>>
>> A BucketAssigner
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> that
>> assigns to buckets based on current system time.
>>
>> @Override
>> public String getBucketId(IN element, BucketAssigner.Context context) {
>>    if (dateTimeFormatter == null) {
>>       dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
>>    }
>>    return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
>> }
>>
>>
>> No problem, I can extend DateTimeBucketAssigner and override the method to grab elementTimestamp instead of currentProccessingTime, but I'm wondering if this is the right approach? And if so would this behavior be useful outside of the context of my application?
>>
>> Thanks in advance for your help and for this awesome framework!
>>
>> Peter
>>
>>
>>

Re: DateTimeBucketAssigner using Element Timestamp

Posted by Peter Groesbeck <pe...@gmail.com>.
Thanks for the quick response Piotr,

I feel like I have everything working but no files are getting written to
disk. I've implemented my own BucketAssigner like so:

class BackFillBucketAssigner[IN] extends BucketAssigner[IN, String] {
  override def getBucketId(element: IN, context:
BucketAssigner.Context): String = {
    DateTimeFormatter.ofPattern(formatString).withZone(ZoneId.systemDefault).format(Instant.ofEpochMilli(context.timestamp()))
  }

  override def getSerializer: SimpleVersionedSerializer[String] =
SimpleVersionedStringSerializer.INSTANCE

}

And plugged it into my sink:

  val parquet = StreamingFileSink
    .forBulkFormat(path,
ParquetAvroWriters.forGenericRecord(ReflectData.get().getSchema(clazz)))
    .withBucketAssigner(new BackFillBucketAssigner[GenericRecord])
    .build

stream.addSink(parquet)

When I run locally I can see the temporary part files but nothing ever
gets rolled. I saw this once before when I didn't have checkpointing
enabled for my original streaming job and this note tipped me off:

IMPORTANT: Bulk-encoding formats can only be combined with the
`OnCheckpointRollingPolicy`, which rolls the in-progress part file on
every checkpoint.

Is it possible that something similar is happening? I have enabled
checkpointing in the job however since it is reading from flat files
and assigning a timestamp, is it possible checkpointing not working as
I expect? Nothing in my logs seems to suggest an error and the job
runs to completion (about 30 minutes).

Thanks again for your help!
Peter


On Fri, May 3, 2019 at 4:46 AM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi Peter,
>
> It sounds like this should work, however my question would be do you want
> exactly-once processing? If yes, then you would have to somehow know which
> exact events needs re-processing or deduplicate them somehow. Keep in mind
> that in case of an outage in the original job, you probably will have some
> files already committed by the StreamingFileSink.
>
> Another approach might be to somehow overwrite the previous files (but
> then you would have to check whether the bucket assignment and file naming
> is completely deterministic) or before reprocessing from backup remove the
> dirty files from the crashed job.
>
> Piotrek
>
> On 2 May 2019, at 23:10, Peter Groesbeck <pe...@gmail.com>
> wrote:
>
> Hi all,
>
> I have an application that reads from various Kafka topics and writes
> parquet files to corresponding buckets on S3 using StreamingFileSink with
> DateTimeBucketAssigner. The upstream application that writes to Kafka
> also writes records as gzipped json files to date bucketed locations on S3
> as backup.
>
> One requirement we have is to back fill missing data in the event that the
> application or Kafka experiences an outage. This can be accomplished by
> reading the backup files that were written to S3 by our upstream
> application instead of reading from Kafka. My current approach is to read
> the hourly backup buckets, transform the files into a DataStream and
> assign them a timestamp based on a datetime field on the json records using
> BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the
> DataStream to the same StreamingFileSink which ideally would write past
> records in the same manner as if they had been streamed by Kafka.
>
> Unfortunately for me, the bucket assigner works on system time:
>
> A BucketAssigner
> <https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> that
> assigns to buckets based on current system time.
>
> @Override
> public String getBucketId(IN element, BucketAssigner.Context context) {
>    if (dateTimeFormatter == null) {
>       dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
>    }
>    return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
> }
>
>
> No problem, I can extend DateTimeBucketAssigner and override the method to grab elementTimestamp instead of currentProccessingTime, but I'm wondering if this is the right approach? And if so would this behavior be useful outside of the context of my application?
>
> Thanks in advance for your help and for this awesome framework!
>
> Peter
>
>
>

Re: DateTimeBucketAssigner using Element Timestamp

Posted by Piotr Nowojski <pi...@ververica.com>.
Hi Peter,

It sounds like this should work, however my question would be do you want exactly-once processing? If yes, then you would have to somehow know which exact events needs re-processing or deduplicate them somehow. Keep in mind that in case of an outage in the original job, you probably will have some files already committed by the StreamingFileSink.

Another approach might be to somehow overwrite the previous files (but then you would have to check whether the bucket assignment and file naming is completely deterministic) or before reprocessing from backup remove the dirty files from the crashed job.

Piotrek

> On 2 May 2019, at 23:10, Peter Groesbeck <pe...@gmail.com> wrote:
> 
> Hi all,
> 
> I have an application that reads from various Kafka topics and writes parquet files to corresponding buckets on S3 using StreamingFileSink with DateTimeBucketAssigner. The upstream application that writes to Kafka also writes records as gzipped json files to date bucketed locations on S3 as backup.
> 
> One requirement we have is to back fill missing data in the event that the application or Kafka experiences an outage. This can be accomplished by reading the backup files that were written to S3 by our upstream application instead of reading from Kafka. My current approach is to read the hourly backup buckets, transform the files into a DataStream and assign them a timestamp based on a datetime field on the json records using BoundedOutOfOrdernessTimestampExtractor. I was then hoping to connect the DataStream to the same StreamingFileSink which ideally would write past records in the same manner as if they had been streamed by Kafka. 
> 
> Unfortunately for me, the bucket assigner works on system time: 
> 
> A BucketAssigner <https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssigner.html> that assigns to buckets based on current system time.
> @Override
> public String getBucketId(IN element, BucketAssigner.Context context) {
>    if (dateTimeFormatter == null) {
>       dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
>    }
>    return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime()));
> }
> 
> No problem, I can extend DateTimeBucketAssigner and override the method to grab elementTimestamp instead of currentProccessingTime, but I'm wondering if this is the right approach? And if so would this behavior be useful outside of the context of my application?
> 
> Thanks in advance for your help and for this awesome framework!
> 
> Peter