You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/04/12 03:53:00 UTC

[jira] [Updated] (FLINK-27189) Support to always add quotes for string data in CSV format

     [ https://issues.apache.org/jira/browse/FLINK-27189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dian Fu updated FLINK-27189:
----------------------------
    Summary: Support to always add quotes for string data in CSV format  (was: Ability for table api to always add quotes to generated csv)

> Support to always add quotes for string data in CSV format
> ----------------------------------------------------------
>
>                 Key: FLINK-27189
>                 URL: https://issues.apache.org/jira/browse/FLINK-27189
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: shameet
>            Priority: Major
>
> I am using the table api in pyflink to generate a csv . What i noticed is that its conditionally adding quotes around the data. What I want is quotes around all the data
> csv is being created in s3
>  
>  
> e.g in output below the data in last column was not quoted
> "[transaction_idgeorge.bluth@reqres.in|mailto:transaction_idgeorge.bluth@reqres.in]",card_hash,transaction_id
> "[NO3500957594177george.bluth@reqres.in|mailto:NO3500957594177george.bluth@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177
> "[NO3500957594178george.bluth@reqres.in|mailto:NO3500957594178george.bluth@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178
>  
>  
> I had posted this question in user community and Dian FU suggested i could create a Jira as this is not supported right now
> [https://lists.apache.org/thread/y2g7kjf6ylmqtm2w9b28kfcdborvcgtn]
>  
>  
> sample code to create a csv in s3
>  
> def create_source_table(table_name, input_path):
> return""" CREATE TABLE \{0} (
> transaction_id VARCHAR(100),
> card_hash VARCHAR(100)
> ) with (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = '\{1}'
> ) """.format(
> table_name, input_path)
> def create_sink_table(table_name, bucket_name):
> return""" CREATE TABLE \{0} (
> transaction_id VARCHAR(100),
> card_hash VARCHAR(100),
> brand_id VARCHAR(100)
> )
> with (
> 'connector'='filesystem',
> 'path'='\{1}',
> 'format'='csv'
> ) """.format(
> table_name, bucket_name)
>  
> 2. Creates a source table from a Kinesis Data Stream
> table_env.execute_sql(
> create_source_table(
> input_table_name, input_file
> )
> ){color:#000000}
> {color}
> table_env.execute_sql(
> create_sink_table(
> out_table_name, output_bucket_name
> )
> ){color:#000000}
> {color}
> table_env.register_function("addme1", addme1)
> {color:#000000} {color}{color:#000000}
> {color}{color:#000000}
> {color}
> source_table = table_env.from_path(input_table_name)
> source_table.select(addme1(source_table.transaction_id),source_table.card_hash, source_table.transaction_id.alias('brand_id')).execute_insert(out_table_name).wait()
> {color:#202020} {color}
>  
>  
> apache-flink version - 1.13
> python 3.8
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)