You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by eSKa <es...@gmail.com> on 2017/09/07 06:42:14 UTC

Additional data read inside dataset transformations

Hello,
I will describe my use case shortly with steps for easier understanding:
1) currently my job is loading data from parquet files using
HadoopInputFormat along with AvroParquetInputFormat, with current approach:
        AvroParquetInputFormat<GenericRecord> inputFormat = new
AvroParquetInputFormat<GenericRecord>();
        AvroParquetInputFormat.setAvroReadSchema(job, schema);
        AvroParquetInputFormat.setUnboundRecordFilter(job,
recordFilterClass);
        HadoopInputFormat<Void, GenericRecord> hadoopInputFormat =
HadoopInputs.createHadoopInput(inputFormat, Void.class, GenericRecord.class,
job);
        return environment.createInput(hadoopInputFormat);
2) data is loaded into DataSource and after various transformations is
grouped by my "user_id" key,
3) in GroupReduceFunction I am dealing with values for given user,
4) for each group in reduce function I am extracting the key (which has been
used for earlier grouping) and would like to read additional data (parquet
files from HDFS for specific key extracted before), which are required for
further grouped data processing
5) after processing inside reduce function, I would like to store results in
parquet files using AvroParquerWriter class.


My question is how additional data loading inside reduce function (or any
other transformation) can be achieved in step number 4). 
In my perfect scenario I would like to use HadoopInputFormat (just like for
loading initial data in first step), however I am missing environment
context here (probably?). Is there any way to achieve this or this scenarios
is completely wrong and therefore badly designed?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Additional data read inside dataset transformations

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

traditionally, you would do a join, but that would mean to read all Parquet
files that might contain relevant data which might be too much.
If you want to read data from within a user function (like GroupReduce),
you are pretty much up to your own.
You could create a HadoopInputFormat wrapping the Parquet format (or the
Parquet format directly) and call it's open method with a manually created
InputSplit and read the data via the InputFormat interface.
For this approach you would need to take care to of the InputFormat
lifecycle yourself (calling all required methods in the right order).

Alternatively, it might be possible to persist the intermediate data to
disk, fetch the required file names, and do a join where one input reads
only the required files. However, this would mean to split the job into two
jobs.

Best, Fabian



2017-09-07 8:42 GMT+02:00 eSKa <es...@gmail.com>:

>
> Hello,
> I will describe my use case shortly with steps for easier understanding:
> 1) currently my job is loading data from parquet files using
> HadoopInputFormat along with AvroParquetInputFormat, with current approach:
>         AvroParquetInputFormat<GenericRecord> inputFormat = new
> AvroParquetInputFormat<GenericRecord>();
>         AvroParquetInputFormat.setAvroReadSchema(job, schema);
>         AvroParquetInputFormat.setUnboundRecordFilter(job,
> recordFilterClass);
>         HadoopInputFormat<Void, GenericRecord> hadoopInputFormat =
> HadoopInputs.createHadoopInput(inputFormat, Void.class,
> GenericRecord.class,
> job);
>         return environment.createInput(hadoopInputFormat);
> 2) data is loaded into DataSource and after various transformations is
> grouped by my "user_id" key,
> 3) in GroupReduceFunction I am dealing with values for given user,
> 4) for each group in reduce function I am extracting the key (which has
> been
> used for earlier grouping) and would like to read additional data (parquet
> files from HDFS for specific key extracted before), which are required for
> further grouped data processing
> 5) after processing inside reduce function, I would like to store results
> in
> parquet files using AvroParquerWriter class.
>
>
> My question is how additional data loading inside reduce function (or any
> other transformation) can be achieved in step number 4).
> In my perfect scenario I would like to use HadoopInputFormat (just like for
> loading initial data in first step), however I am missing environment
> context here (probably?). Is there any way to achieve this or this
> scenarios
> is completely wrong and therefore badly designed?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>