You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Zack Loebel <zl...@gmail.com> on 2021/12/03 18:04:20 UTC

Re: Parquet schema per bucket in Streaming File Sink

Unfortunately this does not solve my use case. Because I want to be able to
create and change the various outputs at runtime (the partition keys would
be dynamic) and as such the sql/extraction would have to change during
execution.
Which I did not believe to be supported. I'm also operating at the
datastream level (although of course I could move the datastream into
sql-land).

Best,
Zack


On Tue, Nov 30, 2021 at 2:41 AM Francesco Guardiani <fr...@ververica.com>
wrote:

> Hi Zack,
>
> > I want to customize this job to "explode" the map as column names and
> values
>
> You can do this in a select statement extracting manually the map values
> using the map access built-in
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#collection-functions>,
> e.g.:
>
> SELECT mymap['a'] AS a, mymap['b'] AS b
>
> > specifically the BucketAssigner and the CheckpointRollingPolicy both
> appear to be required to have a bucketId of a String.
>
> I wonder if what you're looking for is the PARTITIONED BY feature:
>
> CREATE TABLE MySinkTable (
>   ...) PARTITIONED BY (partitionKey1, partitionKey2)
>
> Does this solves your use case?
>
> FG
>
>
> On Tue, Nov 30, 2021 at 7:13 AM Zack Loebel <zl...@gmail.com> wrote:
>
>> Hey all,
>>
>> I have a job which writes data that is a similar shape to a location in
>> s3. Currently it writes a map of data with each row. I want to customize
>> this job to "explode" the map as column names and values, these are
>> consistent for a single bucket. Is there any way to do this? Provide a
>> custom parquet schema per bucket within a single dynamic sink?
>>
>> I've started looking at the changes within the main codebase to make this
>> feasible. It seems straightforward to provide the bucketId to the
>> writerFactory, and the bucketId could be a type containing the relevant
>> schema information.
>> Although it appears that the BulkFormatBuilder has several spots where
>> BucketId appears to be required to be a String: specifically
>> the BucketAssigner and the CheckpointRollingPolicy both appear to be
>> required to have a bucketId of a String.
>>
>> I'm curious if this is a change the community would be open to, and or if
>> there is another way to accomplish what I'm looking for that I've missed.
>>
>> Thanks,
>> Zack
>>
>>

-- 
Have a great day!
Zack Loebel-Begelman