You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vinay Patil <vi...@gmail.com> on 2017/05/03 14:27:34 UTC

Re: Queries regarding Historical Reprocessing

Hi Guys,

Can someone please help me in understanding this ?

Regards,
Vinay Patil

On Thu, Apr 27, 2017 at 12:36 PM, Vinay Patil <vi...@gmail.com>
wrote:

> Hi Guys,
>
> For historical reprocessing , I am reading the avro data from S3 and
> passing these records to the same pipeline for processing.
>
> I have the following queries:
>
> 1. I am running this pipeline as a stream application with checkpointing
> enabled, the records are successfully written to S3, however they remain in
> the pending state as checkpointing is not triggered when I doing
> re-processing. Why does this happen ? (kept the checkpointing interval to 1
> minute, pipeline ran for 10 minutes)
> this is the code I am using for reading avro data from S3
>
>
>
>
>
> *AvroInputFormat<SomeAvroClass> avroInputFormat = new AvroInputFormat<>(
>                   new org.apache.flink.core.fs.Path(s3Path),
> SomeAvroClass.class); sourceStream =
> env.createInput(avroInputFormat).map(...); *
>
> 2. For the source stream Flink sets the parallelism as 1 , and for the
> rest of the operators the user specified parallelism is set. How does Flink
> reads the data ? does it bring the entire file from S3 one at a time  and
> then Split it according to parallelism ?
>
> 3. I am reading from two different S3 folders and treating them as
> separate sourceStreams, how does Flink reads data in this case ? does it
> pick one file from each S3 folder , split the data and pass it downstream ?
> Does Flink reads the data sequentially ? I am confused here as only one
> Task Manager is reading the data from S3 and then all TM's are getting the
> data.
>
> 4. Although I am running this as as stream application, the operators goes
> into FINISHED state after processing , is this because Flink treats the S3
> source as finite data ? What will happen if the data is continuously
> written to S3 from one pipeline and from the second pipeline I am doing
> historical re-processing ?
>
> Regards,
> Vinay Patil
>

Re: Queries regarding Historical Reprocessing

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
Sorry for the longer wait, it’s a longer answer and I had to sort my thoughts. I’ll try and answer each question separately, though the solution for some of the issues are the same.

1. I think the problem here is that Flink will not perform any checkpoints if some operators have finished. The Streaming File Sources are implemented as a combination of two operators: file monitor/split generator and file reader operator. The first one is responsible for enumerating available files and generating input splits. The second one is responsible for reading the actual contents. In a Flink Job it will thus look like this: File Monitor -> File Reader, you should see this in the JobManager dashboard.

Now, by default env.createInput(InputFormat) creates a file monitor that only scans the directory once and then finishes, this is why we don’t see any checkpoints being performed. You can work around this by using
env.readFile(FileInputFormat<T> format, String filePath, FileProcessingMode watchType, long interval)
With a watch type of PROCESS_CONTINUOUSLY. With this, the file monitor will stay active and continuously send input splits downstream for newly created files.

2. The File Monitor should always have parallelism=1 while the read operator will have the parallelism configured by the user.

3. With separate source there would be a separate file monitor/file reader combination for each of them. How they are spread across the TaskManagers depends on the parallelism and how the Cluster, especially the TaskManager slots are configured.

4. See 1. and 2. If you set PROCESS_CONTINUOUSLY it will pick up new files that are added to the folder.

Best,
Aljoscha


> On 3. May 2017, at 16:27, Vinay Patil <vi...@gmail.com> wrote:
> 
> Hi Guys,
> 
> Can someone please help me in understanding this ?
> 
> Regards,
> Vinay Patil
> 
> On Thu, Apr 27, 2017 at 12:36 PM, Vinay Patil <vi...@gmail.com> wrote:
> Hi Guys, 
> 
> For historical reprocessing , I am reading the avro data from S3 and passing these records to the same pipeline for processing. 
> 
> I have the following queries: 
> 
> 1. I am running this pipeline as a stream application with checkpointing enabled, the records are successfully written to S3, however they remain in the pending state as checkpointing is not triggered when I doing re-processing. Why does this happen ? (kept the checkpointing interval to 1 minute, pipeline ran for 10 minutes) 
> this is the code I am using for reading avro data from S3 
> 
> AvroInputFormat<SomeAvroClass> avroInputFormat = new AvroInputFormat<>( 
>                     new org.apache.flink.core.fs.Path(s3Path), SomeAvroClass.class); 
> 
> sourceStream = env.createInput(avroInputFormat).map(...); 
> 
> 
> 2. For the source stream Flink sets the parallelism as 1 , and for the rest of the operators the user specified parallelism is set. How does Flink reads the data ? does it bring the entire file from S3 one at a time  and then Split it according to parallelism ? 
> 
> 3. I am reading from two different S3 folders and treating them as separate sourceStreams, how does Flink reads data in this case ? does it pick one file from each S3 folder , split the data and pass it downstream ? Does Flink reads the data sequentially ? I am confused here as only one Task Manager is reading the data from S3 and then all TM's are getting the data. 
> 
> 4. Although I am running this as as stream application, the operators goes into FINISHED state after processing , is this because Flink treats the S3 source as finite data ? What will happen if the data is continuously written to S3 from one pipeline and from the second pipeline I am doing historical re-processing ? 
> 
> Regards, 
> Vinay Patil
>