You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lorenzo Nicora <lo...@gmail.com> on 2020/06/29 13:51:57 UTC

Reading and updating rule-sets from a file

Hi

My streaming job uses a set of rules to process records from a stream.
The rule set is defined in simple flat files, one rule per line.
The rule set can change from time to time. A user will upload a new file
that must replace the old rule set completely.

My problem is with reading and updating the rule set when I have a new one.
I cannot update single rules. I need the whole rule set to validate it and
build the internal representation to broadcast.

I am reading the file with a *ContinuousFileReaderOperator* and
*InputFormat* (via env.readFile(...) and creating the internal
representation of the rule set I then broadcast. I get new files with
processingMode = PROCESS_CONTINUOUSLY

How do I know when I have read ALL the records from a physical file, to
trigger validating and building the new Rule Set?

I've been thinking about a processing-time trigger, waiting a reasonable
time after I read the first rule of a new file, but it does not look safe
if the user, for example, uploads two new files by mistake.

Cheers
Lorenzo

Re: Reading and updating rule-sets from a file

Posted by Lorenzo Nicora <lo...@gmail.com>.
Thanks Till,

I understand making my FileInputFormat "unsplittable" guarantees a file is
always read by a single task. But how can I produce a single record for the
entire file?

As my file is a CSV with some idiosyncrasies, I am extending CsvInputFormat
not to reinvent the wheel of the CSV parsing and type conversions. This
generates one record per line and I cannot see any handle for the end of
file.

I've been thinking of using a GlobalWindow to process all the rules at once
when I reach the end of file,  but what can I use as a trigger?

Regards
Lorenzo


On Wed, 1 Jul 2020 at 08:21, Till Rohrmann <tr...@apache.org> wrote:

> Hi Lorenzo,
>
> what you could try to do is to derive your own InputFormat (extending
> FileInputFormat) where you set the field `unsplittable` to true. That way,
> an InputSplit is the whole file and you can handle the set of new rules as
> a single record.
>
> Cheers,
> Till
>
> On Mon, Jun 29, 2020 at 3:52 PM Lorenzo Nicora <lo...@gmail.com>
> wrote:
>
>> Hi
>>
>> My streaming job uses a set of rules to process records from a stream.
>> The rule set is defined in simple flat files, one rule per line.
>> The rule set can change from time to time. A user will upload a new file
>> that must replace the old rule set completely.
>>
>> My problem is with reading and updating the rule set when I have a new
>> one.
>> I cannot update single rules. I need the whole rule set to validate it
>> and build the internal representation to broadcast.
>>
>> I am reading the file with a *ContinuousFileReaderOperator* and
>> *InputFormat* (via env.readFile(...) and creating the internal
>> representation of the rule set I then broadcast. I get new files with
>> processingMode = PROCESS_CONTINUOUSLY
>>
>> How do I know when I have read ALL the records from a physical file, to
>> trigger validating and building the new Rule Set?
>>
>> I've been thinking about a processing-time trigger, waiting a reasonable
>> time after I read the first rule of a new file, but it does not look safe
>> if the user, for example, uploads two new files by mistake.
>>
>> Cheers
>> Lorenzo
>>
>

Re: Reading and updating rule-sets from a file

Posted by Till Rohrmann <tr...@apache.org>.
Hi Lorenzo,

what you could try to do is to derive your own InputFormat (extending
FileInputFormat) where you set the field `unsplittable` to true. That way,
an InputSplit is the whole file and you can handle the set of new rules as
a single record.

Cheers,
Till

On Mon, Jun 29, 2020 at 3:52 PM Lorenzo Nicora <lo...@gmail.com>
wrote:

> Hi
>
> My streaming job uses a set of rules to process records from a stream.
> The rule set is defined in simple flat files, one rule per line.
> The rule set can change from time to time. A user will upload a new file
> that must replace the old rule set completely.
>
> My problem is with reading and updating the rule set when I have a new one.
> I cannot update single rules. I need the whole rule set to validate it and
> build the internal representation to broadcast.
>
> I am reading the file with a *ContinuousFileReaderOperator* and
> *InputFormat* (via env.readFile(...) and creating the internal
> representation of the rule set I then broadcast. I get new files with
> processingMode = PROCESS_CONTINUOUSLY
>
> How do I know when I have read ALL the records from a physical file, to
> trigger validating and building the new Rule Set?
>
> I've been thinking about a processing-time trigger, waiting a reasonable
> time after I read the first rule of a new file, but it does not look safe
> if the user, for example, uploads two new files by mistake.
>
> Cheers
> Lorenzo
>