You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Srikanth <sr...@gmail.com> on 2016/02/12 20:53:09 UTC

writeAsCSV with partitionBy

Hello,



Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?

I'm looking to save output as CSV files partitioned by two columns(date and
hour).

The partitionBy dataset API is more to partition the data based on a column
for further processing.



I'm thinking there is no direct API to do this. But what will be the best
way of achieving this?



Srikanth

Re: writeAsCSV with partitionBy

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
the RollingSink can only be used with streaming. Adding support for dynamic
paths based on element contents is certainly interesting. I imagine it can
be tricky, though, to figure out when to close/flush the buckets.

Cheers,
Aljoscha

On Wed, 25 May 2016 at 08:36 KirstiLaurila <ki...@rovio.com> wrote:

> Maybe, I don't know, but with streaming. How about batch?
>
>
> Srikanth wrote
> > Isn't this related to --
> https://issues.apache.org/jira/browse/FLINK-2672
> > ??
> >
> > This can be achieved with a RollingSink[1] & custom Bucketer probably.
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7140.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: writeAsCSV with partitionBy

Posted by KirstiLaurila <ki...@rovio.com>.
Maybe, I don't know, but with streaming. How about batch?


Srikanth wrote
> Isn't this related to -- https://issues.apache.org/jira/browse/FLINK-2672
> ??
> 
> This can be achieved with a RollingSink[1] & custom Bucketer probably.
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7140.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: writeAsCSV with partitionBy

Posted by Juho Autio <ju...@rovio.com>.
RollingSink is part of Flink Streaming API. Can it be used in Flink Batch
jobs, too?

As implied in FLINK-2672, RollingSink doesn't support dynamic bucket paths
based on the tuple fields. The path must be given when creating the
RollingSink instance, ie. before deploying the job. Yes, a custom Bucketer
can be provided, but as the current method signature is, tuple is not
passed to Bucketer.

On Tue, May 24, 2016 at 4:45 PM, Srikanth <sr...@gmail.com> wrote:

> Isn't this related to -- https://issues.apache.org/jira/browse/FLINK-2672
> ??
>
> This can be achieved with a RollingSink[1] & custom Bucketer probably.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html
>
> Srikanth
>
> On Tue, May 24, 2016 at 1:07 AM, KirstiLaurila <ki...@rovio.com>
> wrote:
>
>> Yeah, created this one  https://issues.apache.org/jira/browse/FLINK-3961
>> <https://issues.apache.org/jira/browse/FLINK-3961>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7118.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>

Re: writeAsCSV with partitionBy

Posted by Srikanth <sr...@gmail.com>.
Isn't this related to -- https://issues.apache.org/jira/browse/FLINK-2672 ??

This can be achieved with a RollingSink[1] & custom Bucketer probably.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html

Srikanth

On Tue, May 24, 2016 at 1:07 AM, KirstiLaurila <ki...@rovio.com>
wrote:

> Yeah, created this one  https://issues.apache.org/jira/browse/FLINK-3961
> <https://issues.apache.org/jira/browse/FLINK-3961>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7118.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: writeAsCSV with partitionBy

Posted by KirstiLaurila <ki...@rovio.com>.
Yeah, created this one  https://issues.apache.org/jira/browse/FLINK-3961
<https://issues.apache.org/jira/browse/FLINK-3961>  




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7118.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: writeAsCSV with partitionBy

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Kirsti,

I'm not aware of anybody working on this issue.
Would you like to create a JIRA issue for it?

Best, Fabian

2016-05-23 16:56 GMT+02:00 KirstiLaurila <ki...@rovio.com>:

> Is there any plans to implement this kind of feature (possibility to write
> to
> data specified partitions) in the near future?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7099.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: writeAsCSV with partitionBy

Posted by KirstiLaurila <ki...@rovio.com>.
Is there any plans to implement this kind of feature (possibility to write to
data specified partitions) in the near future?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7099.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: writeAsCSV with partitionBy

Posted by Fabian Hueske <fh...@gmail.com>.
Yes, you're right. I did not understand your question correctly.

Right now, Flink does not feature an output format that writes records to
output files depending on a key attribute.
You would need to implement such an output format yourself and append it as
follows:

val data = ...
data.partitionByHash(0) // partition to send all records with the same key
to the same machine
  .output(new YourOutputFormat())

In case of many distinct keys, you would need to limit the number of open
file handles. The OF will be easier to implement, if you do a
sortPartition(0, Order.ASCENDING) before the output format to sort the data
by key.

Cheers, Fabian




2016-02-16 19:52 GMT+01:00 Srikanth <sr...@gmail.com>:

> Fabian,
>
> Not sure if we are on the same page. If I do something like below code, it
> will groupby field 0 and each task will write a separate part file in
> parallel.
>
>     val sink = data1.join(data2)
>     .where(1).equalTo(0) { ((l,r) => ( l._3, r._3) ) }
>     .partitionByHash(0)
>     .writeAsCsv(pathBase + "output/test", rowDelimiter="\n",
> fieldDelimiter="\t" , WriteMode.OVERWRITE)
>
> This will create folder ./output/test/<1,2,3,4...>
>
> But what I was looking for is Hive style partitionBy that will output with
> folder structure
>
>    ./output/field0=1/file
>    ./output/field0=2/file
>    ./output/field0=3/file
>    ./output/field0=4/file
>
> Assuming field0 is Int and has unique values 1,2,3&4.
>
> Srikanth
>
>
> On Mon, Feb 15, 2016 at 6:20 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Srikanth,
>>
>> DataSet.partitionBy() will partition the data on the declared partition
>> fields.
>> If you append a DataSink with the same parallelism as the partition
>> operator, the data will be written out with the defined partitioning.
>> It should be possible to achieve the behavior you described using
>> DataSet.partitionByHash() or partitionByRange().
>>
>> Best, Fabian
>>
>>
>> 2016-02-12 20:53 GMT+01:00 Srikanth <sr...@gmail.com>:
>>
>>> Hello,
>>>
>>>
>>>
>>> Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?
>>>
>>> I'm looking to save output as CSV files partitioned by two columns(date
>>> and hour).
>>>
>>> The partitionBy dataset API is more to partition the data based on a
>>> column for further processing.
>>>
>>>
>>>
>>> I'm thinking there is no direct API to do this. But what will be the
>>> best way of achieving this?
>>>
>>>
>>>
>>> Srikanth
>>>
>>>
>>>
>>
>>
>

Re: writeAsCSV with partitionBy

Posted by Srikanth <sr...@gmail.com>.
Fabian,

Not sure if we are on the same page. If I do something like below code, it
will groupby field 0 and each task will write a separate part file in
parallel.

    val sink = data1.join(data2)
    .where(1).equalTo(0) { ((l,r) => ( l._3, r._3) ) }
    .partitionByHash(0)
    .writeAsCsv(pathBase + "output/test", rowDelimiter="\n",
fieldDelimiter="\t" , WriteMode.OVERWRITE)

This will create folder ./output/test/<1,2,3,4...>

But what I was looking for is Hive style partitionBy that will output with
folder structure

   ./output/field0=1/file
   ./output/field0=2/file
   ./output/field0=3/file
   ./output/field0=4/file

Assuming field0 is Int and has unique values 1,2,3&4.

Srikanth


On Mon, Feb 15, 2016 at 6:20 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Srikanth,
>
> DataSet.partitionBy() will partition the data on the declared partition
> fields.
> If you append a DataSink with the same parallelism as the partition
> operator, the data will be written out with the defined partitioning.
> It should be possible to achieve the behavior you described using
> DataSet.partitionByHash() or partitionByRange().
>
> Best, Fabian
>
>
> 2016-02-12 20:53 GMT+01:00 Srikanth <sr...@gmail.com>:
>
>> Hello,
>>
>>
>>
>> Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?
>>
>> I'm looking to save output as CSV files partitioned by two columns(date
>> and hour).
>>
>> The partitionBy dataset API is more to partition the data based on a
>> column for further processing.
>>
>>
>>
>> I'm thinking there is no direct API to do this. But what will be the best
>> way of achieving this?
>>
>>
>>
>> Srikanth
>>
>>
>>
>
>

Re: writeAsCSV with partitionBy

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Srikanth,

DataSet.partitionBy() will partition the data on the declared partition
fields.
If you append a DataSink with the same parallelism as the partition
operator, the data will be written out with the defined partitioning.
It should be possible to achieve the behavior you described using
DataSet.partitionByHash() or partitionByRange().

Best, Fabian


2016-02-12 20:53 GMT+01:00 Srikanth <sr...@gmail.com>:

> Hello,
>
>
>
> Is there a Hive(or Spark dataframe) partitionBy equivalent in Flink?
>
> I'm looking to save output as CSV files partitioned by two columns(date
> and hour).
>
> The partitionBy dataset API is more to partition the data based on a
> column for further processing.
>
>
>
> I'm thinking there is no direct API to do this. But what will be the best
> way of achieving this?
>
>
>
> Srikanth
>
>
>