You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "shameet (Jira)" <ji...@apache.org> on 2022/04/11 18:25:00 UTC

[jira] [Created] (FLINK-27189) Ability for table api to always add quotes to generated csv

shameet created FLINK-27189:
-------------------------------

             Summary: Ability for table api to always add quotes to generated csv
                 Key: FLINK-27189
                 URL: https://issues.apache.org/jira/browse/FLINK-27189
             Project: Flink
          Issue Type: Improvement
          Components: API / Python
            Reporter: shameet


I am using the table api in pyflink to generate a csv . What i noticed is that its conditionally adding quotes around the data. What I want is quotes around all the data
csv is being created in s3
 
 
e.g in output below the data in last column was not quoted
"[transaction_idgeorge.bluth@reqres.in|mailto:transaction_idgeorge.bluth@reqres.in]",card_hash,transaction_id
"[NO3500957594177george.bluth@reqres.in|mailto:NO3500957594177george.bluth@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad5",NO3500957594177
"[NO3500957594178george.bluth@reqres.in|mailto:NO3500957594178george.bluth@reqres.in]","bd9e27fdb53b114288948c18fdbf50e9931b935c30bd79ca288c50d3969e1ad6",NO3500957594178
 
 
I had posted this question in user community and Dian FU suggested i could create a Jira as this is not supported right now
[https://lists.apache.org/thread/y2g7kjf6ylmqtm2w9b28kfcdborvcgtn]
 
 
sample code to create a csv in s3
 
def create_source_table(table_name, input_path):
return""" CREATE TABLE \{0} (
transaction_id VARCHAR(100),
card_hash VARCHAR(100)

) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '\{1}'
) """.format(
table_name, input_path)

def create_sink_table(table_name, bucket_name):
return""" CREATE TABLE \{0} (
transaction_id VARCHAR(100),
card_hash VARCHAR(100),
brand_id VARCHAR(100)
)
with (
'connector'='filesystem',
'path'='\{1}',
'format'='csv'
) """.format(
table_name, bucket_name)
 
2. Creates a source table from a Kinesis Data Stream
table_env.execute_sql(
create_source_table(
input_table_name, input_file
)
){color:#000000}
{color}
table_env.execute_sql(
create_sink_table(
out_table_name, output_bucket_name
)
){color:#000000}
{color}
table_env.register_function("addme1", addme1)
{color:#000000} {color}{color:#000000}
{color}{color:#000000}
{color}
source_table = table_env.from_path(input_table_name)
source_table.select(addme1(source_table.transaction_id),source_table.card_hash, source_table.transaction_id.alias('brand_id')).execute_insert(out_table_name).wait()
{color:#202020} {color}
 
 
apache-flink version - 1.13
python 3.8
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)