You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2017/05/11 13:06:07 UTC

Streaming use case: Row enrichment

Hi to all,
we have a particular use case where we have a tabular dataset on HDFS (e.g.
a CSV) that we want to enrich filling some cells with the content returned
by a query to a reverse index (e.g. solr/elasticsearch).
Since we want to be able to make this process resilient and scalable we
thought that Flink streaming could be a good fit since we can control the
"pressure" on the index by adding/removing consumers dynamically and there
is automatic error recovery.

Right now we developed 2 different solutions to the problem:

   1. *move the dataset from HDFS to a queue/topic* (like Kafka or
   RabbitMQ) and then let the queue consumers do the real job (pull Rows from
   the queue, enrich and then persist the enriched Rows). The questions here
   are:
      1. how to properly manage writing to HDFS ? if we read a set of rows,
      we enrich them and we need to write the result back to HDFS, is
it possible
      to automatically compact files in order to avoid the "too many
small files"
      problem on HDFS? How to avoid file name collision (put each batch of rows
      to a different file)?
      2. how to control the number dynamically? is it possible to change
      the parallelism once the job has started?
      2. in order to avoid useless data transfer from HDFS to a queue/topic
   (since we don't need all the Row fields to create the query..usually only
   2/5 fields are needed) we can create a Flink job that put the q*ueries
   into a queue/topic *and wait for the result. The problem with this
   approach is:
      1. how to correlate queries with their responses? creating a unique
      response queue/topic implies that all consumers reads all messages (and
      discard those that are not directed to them) while creating a queue/topic
      for each sub-task could be expansive (in terms of resources and
      managment..but we don't have any evidence/experience of this..it's just a
      possible problem).
   3. Maybe we can exploit *Flink async/IO *somehow...? But how?


Any suggestion/drawbacks on the 2 approaches?

Thanks in advance,
Flavio

Re: Streaming use case: Row enrichment

Posted by Flavio Pompermaier <po...@okkam.it>.
Understood..Thanks anyway Aljoscha!

On Fri, Jun 16, 2017 at 11:55 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> The problem with that is that the file is being read by (possibly, very
> likely) multiple operators in parallel. The file source works like this:
> there is a ContinuousFileMonitoringFunction (this is an actual Flink
> source) that monitors a directory and when a new file appears sends several
> (non overlapping) input splits downstream. After the source, there is an
> operator (ContinuousFileReaderOperator) that receives splits, reads the
> contents of the file at the split and sends it downstream. There is thus no
> central point where we would know that a file was completely processed.
>
> Best,
> Aljoscha
>
> On 16. Jun 2017, at 11:26, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> Is it really necessary to wait for the file to reach the end of the
> pipeline? Isn't sufficient to know that it has been read and the source
> operator has been checkpointed (I don't know if I'm using this word
> correctly...I mean that all the file splits has been processed and Flink
> won't reprocess them anymore).
>
> Best,
> Flavio
>
> On Fri, Jun 16, 2017 at 11:22 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> I was referring to
>>
>> StreamExecutionEnvironment.readFile(
>>   FileInputFormat<OUT> inputFormat,
>>   String filePath,
>>   FileProcessingMode watchType,
>>   long interval)
>>
>> Where you can specify whether the source should shutdown once all files
>> have been had (PROCESS_ONCE) or whether the source should continue to
>> monitor the input directory for new files (PROCESS_CONTINUOUSLY).
>>
>> I think there is currently no built-in way of removing files from the
>> input directory once they have been processed because it’s not possible to
>> know when the contents of a given files have passed through the complete
>> pipeline.
>>
>> Best,
>> Aljoscha
>>
>> On 15. Jun 2017, at 20:00, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>> Hi Aljosha,
>> thanks for the great suggestions, I wasn't aware of
>> AsyncDataStream.unorderedWait and BucketingSink setBucketer().
>> Most probably that's exactly what I was looking for...(I should just have
>> the time to test it.
>> Just one last question: what are you referring to with "you could use a
>> different readFile() method where you can specify  that you want to
>> continue monitoring the directory for new files"?  Is there a way to delete
>> or move to a backup dir the new input files once enriched?
>>
>> Best Flavio
>>
>>
>>
>> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Ok, just trying to make sure I understand everything: You have this:
>>>
>>> 1. A bunch of data in HDFS that you want to enrich
>>> 2. An external service (Solr/ES) that you query for enriching the data
>>> rows stored in 1.
>>> 3. You need to store the enriched rows in HDFS again
>>>
>>> I think you could just do this (roughly):
>>>
>>> StreamExecutionEnvironment env = …;
>>>
>>> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs
>>> path>”);
>>>
>>> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
>>> // or
>>> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) //
>>> yes, the interface for this is a bit strange
>>>
>>> BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
>>> // this is responsible for putting files into buckets, so that you don’t
>>> have to many small HDFS files
>>> sink.setBucketer(new MyBucketer());
>>> enriched.addSink(sink)
>>>
>>> In this case, the file source will close once all files are read and the
>>> job will finish. If you don’t want this you can also use a different
>>> readFile() method where you can specify  that you want to continue
>>> monitoring the directory for new files.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 6. Jun 2017, at 17:38, Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>> Hi Aljosha,
>>> thanks for getting back to me on this! I'll try to simplify the thread
>>> starting from what we want to achieve.
>>>
>>> At the moment we execute some queries to a db and we store the data into
>>> Parquet directories (one for each query).
>>> Let's say we then create a DataStream<Row> from each dir, what we would
>>> like to achieve is to perform some sort of throttling of the queries to
>>> perfrom to this external service (in order to not overload it with too many
>>> queries...but we also need to run as much queries as possible in order to
>>> execute this process in a reasonable time).
>>>
>>> The current batch process has the downside that you must know at priori
>>> the right parallelism of the job while the streaming process should be able
>>> to rescale when needed [1] so it should be easier to tune the job
>>> parallelism without loosing all the already performed queries [2].
>>> Moreover, it the job crash you loose all the work done up to that moment
>>> because there's no checkpointing...
>>> My initial idea was to read from HDFS and put the data into Kafka to be
>>> able to change the number of consumers at runtime (accordingly to the
>>> maxmimum parallelism we can achieve with the external service) but maybe
>>> this could be done in a easier way (we've started using streaming from a
>>> few time so we can see things more complicated than they are).
>>>
>>> Moreover, as the last step, we need to know when all the data has been
>>> enriched so we can stop this first streaming job and we can start with the
>>> next one (that cannot run if the acquisition job is still in progress
>>> because it can break referential integrity). Is there any example of such a
>>> use case?
>>>
>>> [1] at the moment manually..maybe automatically in the future, right?
>>> [2] with the batch job if we want to change the parallelism we need to
>>> stop it and relaunch it, loosing all the already enriched data because
>>> there's no checkpointing there
>>>
>>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <al...@apache.org>
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> I’ll try and answer your questions:
>>>>
>>>> Regarding 1. Why do you need to first read the data from HDFS into
>>>> Kafka (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat,
>>>> String, FileProcessingMode, long) you can monitor a directory in HDFS and
>>>> process the files that are there and any newly arriving files. For batching
>>>> your output, you could look into the BucketingSink which will write to
>>>> files in HDFS (or some other DFS) and start new files (buckets) based on
>>>> some criteria, for example number of processed elements or time.
>>>>
>>>> Regarding 2. I didn’t completely understand this part. Could you maybe
>>>> elaborate a bit, please?
>>>>
>>>> Regarding 3. Yes, I think you can. You would use this to fire of your
>>>> queries to solr/ES.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 11. May 2017, at 15:06, Flavio Pompermaier <po...@okkam.it>
>>>> wrote:
>>>>
>>>> Hi to all,
>>>> we have a particular use case where we have a tabular dataset on HDFS
>>>> (e.g. a CSV) that we want to enrich filling some cells with the content
>>>> returned by a query to a reverse index (e.g. solr/elasticsearch).
>>>> Since we want to be able to make this process resilient and scalable we
>>>> thought that Flink streaming could be a good fit since we can control the
>>>> "pressure" on the index by adding/removing consumers dynamically and there
>>>> is automatic error recovery.
>>>>
>>>> Right now we developed 2 different solutions to the problem:
>>>>
>>>>    1. *move the dataset from HDFS to a queue/topic* (like Kafka or
>>>>    RabbitMQ) and then let the queue consumers do the real job (pull Rows from
>>>>    the queue, enrich and then persist the enriched Rows). The questions here
>>>>    are:
>>>>       1. how to properly manage writing to HDFS ? if we read a set of
>>>>       rows, we enrich them and we need to write the result back to HDFS, is it
>>>>       possible to automatically compact files in order to avoid the "too many
>>>>       small files" problem on HDFS? How to avoid file name collision (put each
>>>>       batch of rows to a different file)?
>>>>       2. how to control the number dynamically? is it possible to
>>>>       change the parallelism once the job has started?
>>>>       2. in order to avoid useless data transfer from HDFS to a
>>>>    queue/topic (since we don't need all the Row fields to create the
>>>>    query..usually only 2/5 fields are needed) we can create a Flink job that
>>>>    put the q*ueries into a queue/topic *and wait for the result. The
>>>>    problem with this approach is:
>>>>       1. how to correlate queries with their responses? creating a
>>>>       unique response queue/topic implies that all consumers reads all messages
>>>>       (and discard those that are not directed to them) while creating a
>>>>       queue/topic for each sub-task could be expansive (in terms of resources and
>>>>       managment..but we don't have any evidence/experience of this..it's just a
>>>>       possible problem).
>>>>    3. Maybe we can exploit *Flink async/IO *somehow...? But how?
>>>>
>>>>
>>>> Any suggestion/drawbacks on the 2 approaches?
>>>>
>>>> Thanks in advance,
>>>> Flavio
>>>>
>>>>
>>>>
>>
>>
>
>

Re: Streaming use case: Row enrichment

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

The problem with that is that the file is being read by (possibly, very likely) multiple operators in parallel. The file source works like this: there is a ContinuousFileMonitoringFunction (this is an actual Flink source) that monitors a directory and when a new file appears sends several (non overlapping) input splits downstream. After the source, there is an operator (ContinuousFileReaderOperator) that receives splits, reads the contents of the file at the split and sends it downstream. There is thus no central point where we would know that a file was completely processed.

Best,
Aljoscha

> On 16. Jun 2017, at 11:26, Flavio Pompermaier <po...@okkam.it> wrote:
> 
> Is it really necessary to wait for the file to reach the end of the pipeline? Isn't sufficient to know that it has been read and the source operator has been checkpointed (I don't know if I'm using this word correctly...I mean that all the file splits has been processed and Flink won't reprocess them anymore).
> 
> Best,
> Flavio
> 
> On Fri, Jun 16, 2017 at 11:22 AM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> I was referring to
> 
> StreamExecutionEnvironment.readFile(
>   FileInputFormat<OUT> inputFormat,
>   String filePath,
>   FileProcessingMode watchType,
>   long interval)
> 
> Where you can specify whether the source should shutdown once all files have been had (PROCESS_ONCE) or whether the source should continue to monitor the input directory for new files (PROCESS_CONTINUOUSLY).
> 
> I think there is currently no built-in way of removing files from the input directory once they have been processed because it’s not possible to know when the contents of a given files have passed through the complete pipeline.
> 
> Best,
> Aljoscha
> 
>> On 15. Jun 2017, at 20:00, Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>> 
>> Hi Aljosha,
>> thanks for the great suggestions, I wasn't aware of AsyncDataStream.unorderedWait and BucketingSink setBucketer().
>> Most probably that's exactly what I was looking for...(I should just have the time to test it. 
>> Just one last question: what are you referring to with "you could use a different readFile() method where you can specify  that you want to continue monitoring the directory for new files"?  Is there a way to delete or move to a backup dir the new input files once enriched?
>> 
>> Best Flavio
>> 
>> 
>> 
>> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> Ok, just trying to make sure I understand everything: You have this:
>> 
>> 1. A bunch of data in HDFS that you want to enrich
>> 2. An external service (Solr/ES) that you query for enriching the data rows stored in 1.
>> 3. You need to store the enriched rows in HDFS again
>> 
>> I think you could just do this (roughly):
>> 
>> StreamExecutionEnvironment env = …;
>> 
>> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs path>”);
>> 
>> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
>> // or
>> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) // yes, the interface for this is a bit strange
>> 
>> BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
>> // this is responsible for putting files into buckets, so that you don’t have to many small HDFS files
>> sink.setBucketer(new MyBucketer());
>> enriched.addSink(sink)
>> 
>> In this case, the file source will close once all files are read and the job will finish. If you don’t want this you can also use a different readFile() method where you can specify  that you want to continue monitoring the directory for new files.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. Jun 2017, at 17:38, Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>>> 
>>> Hi Aljosha,
>>> thanks for getting back to me on this! I'll try to simplify the thread starting from what we want to achieve.
>>> 
>>> At the moment we execute some queries to a db and we store the data into Parquet directories (one for each query). 
>>> Let's say we then create a DataStream<Row> from each dir, what we would like to achieve is to perform some sort of throttling of the queries to perfrom to this external service (in order to not overload it with too many queries...but we also need to run as much queries as possible in order to execute this process in a reasonable time). 
>>> 
>>> The current batch process has the downside that you must know at priori the right parallelism of the job while the streaming process should be able to rescale when needed [1] so it should be easier to tune the job parallelism without loosing all the already performed queries [2]. Moreover, it the job crash you loose all the work done up to that moment because there's no checkpointing...
>>> My initial idea was to read from HDFS and put the data into Kafka to be able to change the number of consumers at runtime (accordingly to the maxmimum parallelism we can achieve with the external service) but maybe this could be done in a easier way (we've started using streaming from a few time so we can see things more complicated than they are).
>>> 
>>> Moreover, as the last step, we need to know when all the data has been enriched so we can stop this first streaming job and we can start with the next one (that cannot run if the acquisition job is still in progress because it can break referential integrity). Is there any example of such a use case?
>>> 
>>> [1] at the moment manually..maybe automatically in the future, right?
>>> [2] with the batch job if we want to change the parallelism we need to stop it and relaunch it, loosing all the already enriched data because there's no checkpointing there
>>> 
>>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>>> Hi Flavio,
>>> 
>>> I’ll try and answer your questions:
>>> 
>>> Regarding 1. Why do you need to first read the data from HDFS into Kafka (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat, String, FileProcessingMode, long) you can monitor a directory in HDFS and process the files that are there and any newly arriving files. For batching your output, you could look into the BucketingSink which will write to files in HDFS (or some other DFS) and start new files (buckets) based on some criteria, for example number of processed elements or time.
>>> 
>>> Regarding 2. I didn’t completely understand this part. Could you maybe elaborate a bit, please?
>>> 
>>> Regarding 3. Yes, I think you can. You would use this to fire of your queries to solr/ES.
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 11. May 2017, at 15:06, Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>>>> 
>>>> Hi to all,
>>>> we have a particular use case where we have a tabular dataset on HDFS (e.g. a CSV) that we want to enrich filling some cells with the content returned by a query to a reverse index (e.g. solr/elasticsearch). 
>>>> Since we want to be able to make this process resilient and scalable we thought that Flink streaming could be a good fit since we can control the "pressure" on the index by adding/removing consumers dynamically and there is automatic error recovery. 
>>>> 
>>>> Right now we developed 2 different solutions to the problem:
>>>> move the dataset from HDFS to a queue/topic (like Kafka or RabbitMQ) and then let the queue consumers do the real job (pull Rows from the queue, enrich and then persist the enriched Rows). The questions here are:
>>>> how to properly manage writing to HDFS ? if we read a set of rows, we enrich them and we need to write the result back to HDFS, is it possible to automatically compact files in order to avoid the "too many small files" problem on HDFS? How to avoid file name collision (put each batch of rows to a different file)?
>>>> how to control the number dynamically? is it possible to change the parallelism once the job has started?
>>>> in order to avoid useless data transfer from HDFS to a queue/topic (since we don't need all the Row fields to create the query..usually only 2/5 fields are needed) we can create a Flink job that put the queries into a queue/topic and wait for the result. The problem with this approach is:
>>>> how to correlate queries with their responses? creating a unique response queue/topic implies that all consumers reads all messages (and discard those that are not directed to them) while creating a queue/topic for each sub-task could be expansive (in terms of resources and managment..but we don't have any evidence/experience of this..it's just a possible problem).
>>>> Maybe we can exploit Flink async/IO somehow...? But how? 
>>>> 
>>>> Any suggestion/drawbacks on the 2 approaches?
>>>> 
>>>> Thanks in advance,
>>>> Flavio
>>> 
>> 
> 
> 
> 


Re: Streaming use case: Row enrichment

Posted by Flavio Pompermaier <po...@okkam.it>.
Is it really necessary to wait for the file to reach the end of the
pipeline? Isn't sufficient to know that it has been read and the source
operator has been checkpointed (I don't know if I'm using this word
correctly...I mean that all the file splits has been processed and Flink
won't reprocess them anymore).

Best,
Flavio

On Fri, Jun 16, 2017 at 11:22 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I was referring to
>
> StreamExecutionEnvironment.readFile(
>   FileInputFormat<OUT> inputFormat,
>   String filePath,
>   FileProcessingMode watchType,
>   long interval)
>
> Where you can specify whether the source should shutdown once all files
> have been had (PROCESS_ONCE) or whether the source should continue to
> monitor the input directory for new files (PROCESS_CONTINUOUSLY).
>
> I think there is currently no built-in way of removing files from the
> input directory once they have been processed because it’s not possible to
> know when the contents of a given files have passed through the complete
> pipeline.
>
> Best,
> Aljoscha
>
> On 15. Jun 2017, at 20:00, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> Hi Aljosha,
> thanks for the great suggestions, I wasn't aware of AsyncDataStream.unorderedWait
> and BucketingSink setBucketer().
> Most probably that's exactly what I was looking for...(I should just have
> the time to test it.
> Just one last question: what are you referring to with "you could use a
> different readFile() method where you can specify  that you want to
> continue monitoring the directory for new files"?  Is there a way to delete
> or move to a backup dir the new input files once enriched?
>
> Best Flavio
>
>
>
> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Ok, just trying to make sure I understand everything: You have this:
>>
>> 1. A bunch of data in HDFS that you want to enrich
>> 2. An external service (Solr/ES) that you query for enriching the data
>> rows stored in 1.
>> 3. You need to store the enriched rows in HDFS again
>>
>> I think you could just do this (roughly):
>>
>> StreamExecutionEnvironment env = …;
>>
>> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs
>> path>”);
>>
>> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
>> // or
>> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) //
>> yes, the interface for this is a bit strange
>>
>> BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
>> // this is responsible for putting files into buckets, so that you don’t
>> have to many small HDFS files
>> sink.setBucketer(new MyBucketer());
>> enriched.addSink(sink)
>>
>> In this case, the file source will close once all files are read and the
>> job will finish. If you don’t want this you can also use a different
>> readFile() method where you can specify  that you want to continue
>> monitoring the directory for new files.
>>
>> Best,
>> Aljoscha
>>
>> On 6. Jun 2017, at 17:38, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>> Hi Aljosha,
>> thanks for getting back to me on this! I'll try to simplify the thread
>> starting from what we want to achieve.
>>
>> At the moment we execute some queries to a db and we store the data into
>> Parquet directories (one for each query).
>> Let's say we then create a DataStream<Row> from each dir, what we would
>> like to achieve is to perform some sort of throttling of the queries to
>> perfrom to this external service (in order to not overload it with too many
>> queries...but we also need to run as much queries as possible in order to
>> execute this process in a reasonable time).
>>
>> The current batch process has the downside that you must know at priori
>> the right parallelism of the job while the streaming process should be able
>> to rescale when needed [1] so it should be easier to tune the job
>> parallelism without loosing all the already performed queries [2].
>> Moreover, it the job crash you loose all the work done up to that moment
>> because there's no checkpointing...
>> My initial idea was to read from HDFS and put the data into Kafka to be
>> able to change the number of consumers at runtime (accordingly to the
>> maxmimum parallelism we can achieve with the external service) but maybe
>> this could be done in a easier way (we've started using streaming from a
>> few time so we can see things more complicated than they are).
>>
>> Moreover, as the last step, we need to know when all the data has been
>> enriched so we can stop this first streaming job and we can start with the
>> next one (that cannot run if the acquisition job is still in progress
>> because it can break referential integrity). Is there any example of such a
>> use case?
>>
>> [1] at the moment manually..maybe automatically in the future, right?
>> [2] with the batch job if we want to change the parallelism we need to
>> stop it and relaunch it, loosing all the already enriched data because
>> there's no checkpointing there
>>
>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi Flavio,
>>>
>>> I’ll try and answer your questions:
>>>
>>> Regarding 1. Why do you need to first read the data from HDFS into Kafka
>>> (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat,
>>> String, FileProcessingMode, long) you can monitor a directory in HDFS and
>>> process the files that are there and any newly arriving files. For batching
>>> your output, you could look into the BucketingSink which will write to
>>> files in HDFS (or some other DFS) and start new files (buckets) based on
>>> some criteria, for example number of processed elements or time.
>>>
>>> Regarding 2. I didn’t completely understand this part. Could you maybe
>>> elaborate a bit, please?
>>>
>>> Regarding 3. Yes, I think you can. You would use this to fire of your
>>> queries to solr/ES.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 11. May 2017, at 15:06, Flavio Pompermaier <po...@okkam.it>
>>> wrote:
>>>
>>> Hi to all,
>>> we have a particular use case where we have a tabular dataset on HDFS
>>> (e.g. a CSV) that we want to enrich filling some cells with the content
>>> returned by a query to a reverse index (e.g. solr/elasticsearch).
>>> Since we want to be able to make this process resilient and scalable we
>>> thought that Flink streaming could be a good fit since we can control the
>>> "pressure" on the index by adding/removing consumers dynamically and there
>>> is automatic error recovery.
>>>
>>> Right now we developed 2 different solutions to the problem:
>>>
>>>    1. *move the dataset from HDFS to a queue/topic* (like Kafka or
>>>    RabbitMQ) and then let the queue consumers do the real job (pull Rows from
>>>    the queue, enrich and then persist the enriched Rows). The questions here
>>>    are:
>>>       1. how to properly manage writing to HDFS ? if we read a set of
>>>       rows, we enrich them and we need to write the result back to HDFS, is it
>>>       possible to automatically compact files in order to avoid the "too many
>>>       small files" problem on HDFS? How to avoid file name collision (put each
>>>       batch of rows to a different file)?
>>>       2. how to control the number dynamically? is it possible to
>>>       change the parallelism once the job has started?
>>>       2. in order to avoid useless data transfer from HDFS to a
>>>    queue/topic (since we don't need all the Row fields to create the
>>>    query..usually only 2/5 fields are needed) we can create a Flink job that
>>>    put the q*ueries into a queue/topic *and wait for the result. The
>>>    problem with this approach is:
>>>       1. how to correlate queries with their responses? creating a
>>>       unique response queue/topic implies that all consumers reads all messages
>>>       (and discard those that are not directed to them) while creating a
>>>       queue/topic for each sub-task could be expansive (in terms of resources and
>>>       managment..but we don't have any evidence/experience of this..it's just a
>>>       possible problem).
>>>    3. Maybe we can exploit *Flink async/IO *somehow...? But how?
>>>
>>>
>>> Any suggestion/drawbacks on the 2 approaches?
>>>
>>> Thanks in advance,
>>> Flavio
>>>
>>>
>>>
>
>

Re: Streaming use case: Row enrichment

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I was referring to

StreamExecutionEnvironment.readFile(
  FileInputFormat<OUT> inputFormat,
  String filePath,
  FileProcessingMode watchType,
  long interval)

Where you can specify whether the source should shutdown once all files have been had (PROCESS_ONCE) or whether the source should continue to monitor the input directory for new files (PROCESS_CONTINUOUSLY).

I think there is currently no built-in way of removing files from the input directory once they have been processed because it’s not possible to know when the contents of a given files have passed through the complete pipeline.

Best,
Aljoscha

> On 15. Jun 2017, at 20:00, Flavio Pompermaier <po...@okkam.it> wrote:
> 
> Hi Aljosha,
> thanks for the great suggestions, I wasn't aware of AsyncDataStream.unorderedWait and BucketingSink setBucketer().
> Most probably that's exactly what I was looking for...(I should just have the time to test it. 
> Just one last question: what are you referring to with "you could use a different readFile() method where you can specify  that you want to continue monitoring the directory for new files"?  Is there a way to delete or move to a backup dir the new input files once enriched?
> 
> Best Flavio
> 
> 
> 
> On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Ok, just trying to make sure I understand everything: You have this:
> 
> 1. A bunch of data in HDFS that you want to enrich
> 2. An external service (Solr/ES) that you query for enriching the data rows stored in 1.
> 3. You need to store the enriched rows in HDFS again
> 
> I think you could just do this (roughly):
> 
> StreamExecutionEnvironment env = …;
> 
> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs path>”);
> 
> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
> // or
> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) // yes, the interface for this is a bit strange
> 
> BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
> // this is responsible for putting files into buckets, so that you don’t have to many small HDFS files
> sink.setBucketer(new MyBucketer());
> enriched.addSink(sink)
> 
> In this case, the file source will close once all files are read and the job will finish. If you don’t want this you can also use a different readFile() method where you can specify  that you want to continue monitoring the directory for new files.
> 
> Best,
> Aljoscha
> 
>> On 6. Jun 2017, at 17:38, Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>> 
>> Hi Aljosha,
>> thanks for getting back to me on this! I'll try to simplify the thread starting from what we want to achieve.
>> 
>> At the moment we execute some queries to a db and we store the data into Parquet directories (one for each query). 
>> Let's say we then create a DataStream<Row> from each dir, what we would like to achieve is to perform some sort of throttling of the queries to perfrom to this external service (in order to not overload it with too many queries...but we also need to run as much queries as possible in order to execute this process in a reasonable time). 
>> 
>> The current batch process has the downside that you must know at priori the right parallelism of the job while the streaming process should be able to rescale when needed [1] so it should be easier to tune the job parallelism without loosing all the already performed queries [2]. Moreover, it the job crash you loose all the work done up to that moment because there's no checkpointing...
>> My initial idea was to read from HDFS and put the data into Kafka to be able to change the number of consumers at runtime (accordingly to the maxmimum parallelism we can achieve with the external service) but maybe this could be done in a easier way (we've started using streaming from a few time so we can see things more complicated than they are).
>> 
>> Moreover, as the last step, we need to know when all the data has been enriched so we can stop this first streaming job and we can start with the next one (that cannot run if the acquisition job is still in progress because it can break referential integrity). Is there any example of such a use case?
>> 
>> [1] at the moment manually..maybe automatically in the future, right?
>> [2] with the batch job if we want to change the parallelism we need to stop it and relaunch it, loosing all the already enriched data because there's no checkpointing there
>> 
>> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
>> Hi Flavio,
>> 
>> I’ll try and answer your questions:
>> 
>> Regarding 1. Why do you need to first read the data from HDFS into Kafka (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat, String, FileProcessingMode, long) you can monitor a directory in HDFS and process the files that are there and any newly arriving files. For batching your output, you could look into the BucketingSink which will write to files in HDFS (or some other DFS) and start new files (buckets) based on some criteria, for example number of processed elements or time.
>> 
>> Regarding 2. I didn’t completely understand this part. Could you maybe elaborate a bit, please?
>> 
>> Regarding 3. Yes, I think you can. You would use this to fire of your queries to solr/ES.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 11. May 2017, at 15:06, Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>>> 
>>> Hi to all,
>>> we have a particular use case where we have a tabular dataset on HDFS (e.g. a CSV) that we want to enrich filling some cells with the content returned by a query to a reverse index (e.g. solr/elasticsearch). 
>>> Since we want to be able to make this process resilient and scalable we thought that Flink streaming could be a good fit since we can control the "pressure" on the index by adding/removing consumers dynamically and there is automatic error recovery. 
>>> 
>>> Right now we developed 2 different solutions to the problem:
>>> move the dataset from HDFS to a queue/topic (like Kafka or RabbitMQ) and then let the queue consumers do the real job (pull Rows from the queue, enrich and then persist the enriched Rows). The questions here are:
>>> how to properly manage writing to HDFS ? if we read a set of rows, we enrich them and we need to write the result back to HDFS, is it possible to automatically compact files in order to avoid the "too many small files" problem on HDFS? How to avoid file name collision (put each batch of rows to a different file)?
>>> how to control the number dynamically? is it possible to change the parallelism once the job has started?
>>> in order to avoid useless data transfer from HDFS to a queue/topic (since we don't need all the Row fields to create the query..usually only 2/5 fields are needed) we can create a Flink job that put the queries into a queue/topic and wait for the result. The problem with this approach is:
>>> how to correlate queries with their responses? creating a unique response queue/topic implies that all consumers reads all messages (and discard those that are not directed to them) while creating a queue/topic for each sub-task could be expansive (in terms of resources and managment..but we don't have any evidence/experience of this..it's just a possible problem).
>>> Maybe we can exploit Flink async/IO somehow...? But how? 
>>> 
>>> Any suggestion/drawbacks on the 2 approaches?
>>> 
>>> Thanks in advance,
>>> Flavio
>> 
> 


Re: Streaming use case: Row enrichment

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Aljosha,
thanks for the great suggestions, I wasn't aware of
AsyncDataStream.unorderedWait
and BucketingSink setBucketer().
Most probably that's exactly what I was looking for...(I should just have
the time to test it.
Just one last question: what are you referring to with "you could use a
different readFile() method where you can specify  that you want to
continue monitoring the directory for new files"?  Is there a way to delete
or move to a backup dir the new input files once enriched?

Best Flavio



On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Ok, just trying to make sure I understand everything: You have this:
>
> 1. A bunch of data in HDFS that you want to enrich
> 2. An external service (Solr/ES) that you query for enriching the data
> rows stored in 1.
> 3. You need to store the enriched rows in HDFS again
>
> I think you could just do this (roughly):
>
> StreamExecutionEnvironment env = …;
>
> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs
> path>”);
>
> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
> // or
> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) //
> yes, the interface for this is a bit strange
>
> BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
> // this is responsible for putting files into buckets, so that you don’t
> have to many small HDFS files
> sink.setBucketer(new MyBucketer());
> enriched.addSink(sink)
>
> In this case, the file source will close once all files are read and the
> job will finish. If you don’t want this you can also use a different
> readFile() method where you can specify  that you want to continue
> monitoring the directory for new files.
>
> Best,
> Aljoscha
>
> On 6. Jun 2017, at 17:38, Flavio Pompermaier <po...@okkam.it> wrote:
>
> Hi Aljosha,
> thanks for getting back to me on this! I'll try to simplify the thread
> starting from what we want to achieve.
>
> At the moment we execute some queries to a db and we store the data into
> Parquet directories (one for each query).
> Let's say we then create a DataStream<Row> from each dir, what we would
> like to achieve is to perform some sort of throttling of the queries to
> perfrom to this external service (in order to not overload it with too many
> queries...but we also need to run as much queries as possible in order to
> execute this process in a reasonable time).
>
> The current batch process has the downside that you must know at priori
> the right parallelism of the job while the streaming process should be able
> to rescale when needed [1] so it should be easier to tune the job
> parallelism without loosing all the already performed queries [2].
> Moreover, it the job crash you loose all the work done up to that moment
> because there's no checkpointing...
> My initial idea was to read from HDFS and put the data into Kafka to be
> able to change the number of consumers at runtime (accordingly to the
> maxmimum parallelism we can achieve with the external service) but maybe
> this could be done in a easier way (we've started using streaming from a
> few time so we can see things more complicated than they are).
>
> Moreover, as the last step, we need to know when all the data has been
> enriched so we can stop this first streaming job and we can start with the
> next one (that cannot run if the acquisition job is still in progress
> because it can break referential integrity). Is there any example of such a
> use case?
>
> [1] at the moment manually..maybe automatically in the future, right?
> [2] with the batch job if we want to change the parallelism we need to
> stop it and relaunch it, loosing all the already enriched data because
> there's no checkpointing there
>
> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi Flavio,
>>
>> I’ll try and answer your questions:
>>
>> Regarding 1. Why do you need to first read the data from HDFS into Kafka
>> (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat,
>> String, FileProcessingMode, long) you can monitor a directory in HDFS and
>> process the files that are there and any newly arriving files. For batching
>> your output, you could look into the BucketingSink which will write to
>> files in HDFS (or some other DFS) and start new files (buckets) based on
>> some criteria, for example number of processed elements or time.
>>
>> Regarding 2. I didn’t completely understand this part. Could you maybe
>> elaborate a bit, please?
>>
>> Regarding 3. Yes, I think you can. You would use this to fire of your
>> queries to solr/ES.
>>
>> Best,
>> Aljoscha
>>
>> On 11. May 2017, at 15:06, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>> Hi to all,
>> we have a particular use case where we have a tabular dataset on HDFS
>> (e.g. a CSV) that we want to enrich filling some cells with the content
>> returned by a query to a reverse index (e.g. solr/elasticsearch).
>> Since we want to be able to make this process resilient and scalable we
>> thought that Flink streaming could be a good fit since we can control the
>> "pressure" on the index by adding/removing consumers dynamically and there
>> is automatic error recovery.
>>
>> Right now we developed 2 different solutions to the problem:
>>
>>    1. *move the dataset from HDFS to a queue/topic* (like Kafka or
>>    RabbitMQ) and then let the queue consumers do the real job (pull Rows from
>>    the queue, enrich and then persist the enriched Rows). The questions here
>>    are:
>>       1. how to properly manage writing to HDFS ? if we read a set of
>>       rows, we enrich them and we need to write the result back to HDFS, is it
>>       possible to automatically compact files in order to avoid the "too many
>>       small files" problem on HDFS? How to avoid file name collision (put each
>>       batch of rows to a different file)?
>>       2. how to control the number dynamically? is it possible to change
>>       the parallelism once the job has started?
>>       2. in order to avoid useless data transfer from HDFS to a
>>    queue/topic (since we don't need all the Row fields to create the
>>    query..usually only 2/5 fields are needed) we can create a Flink job that
>>    put the q*ueries into a queue/topic *and wait for the result. The
>>    problem with this approach is:
>>       1. how to correlate queries with their responses? creating a
>>       unique response queue/topic implies that all consumers reads all messages
>>       (and discard those that are not directed to them) while creating a
>>       queue/topic for each sub-task could be expansive (in terms of resources and
>>       managment..but we don't have any evidence/experience of this..it's just a
>>       possible problem).
>>    3. Maybe we can exploit *Flink async/IO *somehow...? But how?
>>
>>
>> Any suggestion/drawbacks on the 2 approaches?
>>
>> Thanks in advance,
>> Flavio
>>
>>
>>

Re: Streaming use case: Row enrichment

Posted by Aljoscha Krettek <al...@apache.org>.
Ok, just trying to make sure I understand everything: You have this:

1. A bunch of data in HDFS that you want to enrich
2. An external service (Solr/ES) that you query for enriching the data rows stored in 1.
3. You need to store the enriched rows in HDFS again

I think you could just do this (roughly):

StreamExecutionEnvironment env = …;

DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs path>”);

DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
// or
DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) // yes, the interface for this is a bit strange

BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
// this is responsible for putting files into buckets, so that you don’t have to many small HDFS files
sink.setBucketer(new MyBucketer());
enriched.addSink(sink)

In this case, the file source will close once all files are read and the job will finish. If you don’t want this you can also use a different readFile() method where you can specify  that you want to continue monitoring the directory for new files.

Best,
Aljoscha

> On 6. Jun 2017, at 17:38, Flavio Pompermaier <po...@okkam.it> wrote:
> 
> Hi Aljosha,
> thanks for getting back to me on this! I'll try to simplify the thread starting from what we want to achieve.
> 
> At the moment we execute some queries to a db and we store the data into Parquet directories (one for each query). 
> Let's say we then create a DataStream<Row> from each dir, what we would like to achieve is to perform some sort of throttling of the queries to perfrom to this external service (in order to not overload it with too many queries...but we also need to run as much queries as possible in order to execute this process in a reasonable time). 
> 
> The current batch process has the downside that you must know at priori the right parallelism of the job while the streaming process should be able to rescale when needed [1] so it should be easier to tune the job parallelism without loosing all the already performed queries [2]. Moreover, it the job crash you loose all the work done up to that moment because there's no checkpointing...
> My initial idea was to read from HDFS and put the data into Kafka to be able to change the number of consumers at runtime (accordingly to the maxmimum parallelism we can achieve with the external service) but maybe this could be done in a easier way (we've started using streaming from a few time so we can see things more complicated than they are).
> 
> Moreover, as the last step, we need to know when all the data has been enriched so we can stop this first streaming job and we can start with the next one (that cannot run if the acquisition job is still in progress because it can break referential integrity). Is there any example of such a use case?
> 
> [1] at the moment manually..maybe automatically in the future, right?
> [2] with the batch job if we want to change the parallelism we need to stop it and relaunch it, loosing all the already enriched data because there's no checkpointing there
> 
> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi Flavio,
> 
> I’ll try and answer your questions:
> 
> Regarding 1. Why do you need to first read the data from HDFS into Kafka (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat, String, FileProcessingMode, long) you can monitor a directory in HDFS and process the files that are there and any newly arriving files. For batching your output, you could look into the BucketingSink which will write to files in HDFS (or some other DFS) and start new files (buckets) based on some criteria, for example number of processed elements or time.
> 
> Regarding 2. I didn’t completely understand this part. Could you maybe elaborate a bit, please?
> 
> Regarding 3. Yes, I think you can. You would use this to fire of your queries to solr/ES.
> 
> Best,
> Aljoscha
> 
>> On 11. May 2017, at 15:06, Flavio Pompermaier <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>> 
>> Hi to all,
>> we have a particular use case where we have a tabular dataset on HDFS (e.g. a CSV) that we want to enrich filling some cells with the content returned by a query to a reverse index (e.g. solr/elasticsearch). 
>> Since we want to be able to make this process resilient and scalable we thought that Flink streaming could be a good fit since we can control the "pressure" on the index by adding/removing consumers dynamically and there is automatic error recovery. 
>> 
>> Right now we developed 2 different solutions to the problem:
>> move the dataset from HDFS to a queue/topic (like Kafka or RabbitMQ) and then let the queue consumers do the real job (pull Rows from the queue, enrich and then persist the enriched Rows). The questions here are:
>> how to properly manage writing to HDFS ? if we read a set of rows, we enrich them and we need to write the result back to HDFS, is it possible to automatically compact files in order to avoid the "too many small files" problem on HDFS? How to avoid file name collision (put each batch of rows to a different file)?
>> how to control the number dynamically? is it possible to change the parallelism once the job has started?
>> in order to avoid useless data transfer from HDFS to a queue/topic (since we don't need all the Row fields to create the query..usually only 2/5 fields are needed) we can create a Flink job that put the queries into a queue/topic and wait for the result. The problem with this approach is:
>> how to correlate queries with their responses? creating a unique response queue/topic implies that all consumers reads all messages (and discard those that are not directed to them) while creating a queue/topic for each sub-task could be expansive (in terms of resources and managment..but we don't have any evidence/experience of this..it's just a possible problem).
>> Maybe we can exploit Flink async/IO somehow...? But how? 
>> 
>> Any suggestion/drawbacks on the 2 approaches?
>> 
>> Thanks in advance,
>> Flavio
> 
> 


Re: Streaming use case: Row enrichment

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Aljosha,
thanks for getting back to me on this! I'll try to simplify the thread
starting from what we want to achieve.

At the moment we execute some queries to a db and we store the data into
Parquet directories (one for each query).
Let's say we then create a DataStream<Row> from each dir, what we would
like to achieve is to perform some sort of throttling of the queries to
perfrom to this external service (in order to not overload it with too many
queries...but we also need to run as much queries as possible in order to
execute this process in a reasonable time).

The current batch process has the downside that you must know at priori the
right parallelism of the job while the streaming process should be able to
rescale when needed [1] so it should be easier to tune the job parallelism
without loosing all the already performed queries [2]. Moreover, it the job
crash you loose all the work done up to that moment because there's no
checkpointing...
My initial idea was to read from HDFS and put the data into Kafka to be
able to change the number of consumers at runtime (accordingly to the
maxmimum parallelism we can achieve with the external service) but maybe
this could be done in a easier way (we've started using streaming from a
few time so we can see things more complicated than they are).

Moreover, as the last step, we need to know when all the data has been
enriched so we can stop this first streaming job and we can start with the
next one (that cannot run if the acquisition job is still in progress
because it can break referential integrity). Is there any example of such a
use case?

[1] at the moment manually..maybe automatically in the future, right?
[2] with the batch job if we want to change the parallelism we need to stop
it and relaunch it, loosing all the already enriched data because there's
no checkpointing there

On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi Flavio,
>
> I’ll try and answer your questions:
>
> Regarding 1. Why do you need to first read the data from HDFS into Kafka
> (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat,
> String, FileProcessingMode, long) you can monitor a directory in HDFS and
> process the files that are there and any newly arriving files. For batching
> your output, you could look into the BucketingSink which will write to
> files in HDFS (or some other DFS) and start new files (buckets) based on
> some criteria, for example number of processed elements or time.
>
> Regarding 2. I didn’t completely understand this part. Could you maybe
> elaborate a bit, please?
>
> Regarding 3. Yes, I think you can. You would use this to fire of your
> queries to solr/ES.
>
> Best,
> Aljoscha
>
> On 11. May 2017, at 15:06, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> Hi to all,
> we have a particular use case where we have a tabular dataset on HDFS
> (e.g. a CSV) that we want to enrich filling some cells with the content
> returned by a query to a reverse index (e.g. solr/elasticsearch).
> Since we want to be able to make this process resilient and scalable we
> thought that Flink streaming could be a good fit since we can control the
> "pressure" on the index by adding/removing consumers dynamically and there
> is automatic error recovery.
>
> Right now we developed 2 different solutions to the problem:
>
>    1. *move the dataset from HDFS to a queue/topic* (like Kafka or
>    RabbitMQ) and then let the queue consumers do the real job (pull Rows from
>    the queue, enrich and then persist the enriched Rows). The questions here
>    are:
>       1. how to properly manage writing to HDFS ? if we read a set of
>       rows, we enrich them and we need to write the result back to HDFS, is it
>       possible to automatically compact files in order to avoid the "too many
>       small files" problem on HDFS? How to avoid file name collision (put each
>       batch of rows to a different file)?
>       2. how to control the number dynamically? is it possible to change
>       the parallelism once the job has started?
>       2. in order to avoid useless data transfer from HDFS to a
>    queue/topic (since we don't need all the Row fields to create the
>    query..usually only 2/5 fields are needed) we can create a Flink job that
>    put the q*ueries into a queue/topic *and wait for the result. The
>    problem with this approach is:
>       1. how to correlate queries with their responses? creating a unique
>       response queue/topic implies that all consumers reads all messages (and
>       discard those that are not directed to them) while creating a queue/topic
>       for each sub-task could be expansive (in terms of resources and
>       managment..but we don't have any evidence/experience of this..it's just a
>       possible problem).
>    3. Maybe we can exploit *Flink async/IO *somehow...? But how?
>
>
> Any suggestion/drawbacks on the 2 approaches?
>
> Thanks in advance,
> Flavio
>
>
>

Re: Streaming use case: Row enrichment

Posted by Aljoscha Krettek <al...@apache.org>.
Hi Flavio,

I’ll try and answer your questions:

Regarding 1. Why do you need to first read the data from HDFS into Kafka (or another queue)? Using StreamExecutionEnvironment.readFile(FileInputFormat, String, FileProcessingMode, long) you can monitor a directory in HDFS and process the files that are there and any newly arriving files. For batching your output, you could look into the BucketingSink which will write to files in HDFS (or some other DFS) and start new files (buckets) based on some criteria, for example number of processed elements or time.

Regarding 2. I didn’t completely understand this part. Could you maybe elaborate a bit, please?

Regarding 3. Yes, I think you can. You would use this to fire of your queries to solr/ES.

Best,
Aljoscha

> On 11. May 2017, at 15:06, Flavio Pompermaier <po...@okkam.it> wrote:
> 
> Hi to all,
> we have a particular use case where we have a tabular dataset on HDFS (e.g. a CSV) that we want to enrich filling some cells with the content returned by a query to a reverse index (e.g. solr/elasticsearch). 
> Since we want to be able to make this process resilient and scalable we thought that Flink streaming could be a good fit since we can control the "pressure" on the index by adding/removing consumers dynamically and there is automatic error recovery. 
> 
> Right now we developed 2 different solutions to the problem:
> move the dataset from HDFS to a queue/topic (like Kafka or RabbitMQ) and then let the queue consumers do the real job (pull Rows from the queue, enrich and then persist the enriched Rows). The questions here are:
> how to properly manage writing to HDFS ? if we read a set of rows, we enrich them and we need to write the result back to HDFS, is it possible to automatically compact files in order to avoid the "too many small files" problem on HDFS? How to avoid file name collision (put each batch of rows to a different file)?
> how to control the number dynamically? is it possible to change the parallelism once the job has started?
> in order to avoid useless data transfer from HDFS to a queue/topic (since we don't need all the Row fields to create the query..usually only 2/5 fields are needed) we can create a Flink job that put the queries into a queue/topic and wait for the result. The problem with this approach is:
> how to correlate queries with their responses? creating a unique response queue/topic implies that all consumers reads all messages (and discard those that are not directed to them) while creating a queue/topic for each sub-task could be expansive (in terms of resources and managment..but we don't have any evidence/experience of this..it's just a possible problem).
> Maybe we can exploit Flink async/IO somehow...? But how? 
> 
> Any suggestion/drawbacks on the 2 approaches?
> 
> Thanks in advance,
> Flavio