You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2023/03/02 02:07:00 UTC

[jira] [Comment Edited] (FLINK-29729) Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader

    [ https://issues.apache.org/jira/browse/FLINK-29729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17695091#comment-17695091 ] 

Jark Wu edited comment on FLINK-29729 at 3/2/23 2:06 AM:
---------------------------------------------------------

Fixed in:
 - master: 49b8726850cb0da5f3a888c836f31bfa69be739d
 - release-1.17: 3e02b3002e07b704073bffdae7d41c4d7b462a2e
 - release-1.16: 5bb44a1565b87189338226f223bf0a8bc5ce356e
 - release-1.15:  25603b00678841e98f8e02c3d7aac35aafbdb34d


was (Author: jark):
Fixed in:
 - master: 49b8726850cb0da5f3a888c836f31bfa69be739d
 - release-1.17: TODO
 - release-1.16: TODO
 - release-1.15:  TODO

> Fix credential info configured in flink-conf.yaml is lost during creating ParquetReader
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-29729
>                 URL: https://issues.apache.org/jira/browse/FLINK-29729
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Rascal Wu
>            Assignee: dalongliu
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2022-10-22-17-41-38-084.png
>
>
> Hi, I'm thinking if we can include the configured properties from flink-conf.yaml during create ParquetReader in `ParquetVectorizedInputformat` besides hadoop configuration.
>  
> I meet a use case that I want to query a table from S3 bucket with parquet format via filesystem connector, and I configured the AWS credential info in the `flink-conf.yaml`, e.g. fs.s3a.access.key, fs.s3a.secret.key, etc. 
>  
> The JobManager(SourceCoordinator) works well about "getFileStatus" of S3 objects and generate splits, but TaskManager(SourceOperator -> ParquetVectorizedInputFormat -> ParquetReader) doesn't work since missing AWS credential info.
>  
> After taking a deep analysis at the source code about creating ParquetReader to reader footer, I found that the AWS credential info is not passed during create & initialize S3AFileSystem, the detail info as showing in the bellow snapshot.  !image-2022-10-22-17-41-38-084.png!
>  
> The `hadoopConfig` only contains the properties from table format options and default hadoop properties from core-site.xml, hdfs-site.xml and etc. Because the `hadoopConfig` is injected by `ParquetFileFormatFactory#createRuntimeDecoder` -> `ParquetColumnarRowInputFormat.createPartitionedFormat` -> `ParquetFileFormatFactory.generateParquetConfiguration`
>  
> {code:java}
> @Override
> public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
> DynamicTableSource.Context sourceContext,
> DataType producedDataType,
> int[][] projections) {
> return ParquetColumnarRowInputFormat.createPartitionedFormat(
> getParquetConfiguration(formatOptions),
> (RowType) Projection.of(projections).project(producedDataType).getLogicalType(),
> sourceContext.createTypeInformation(producedDataType),
> Collections.emptyList(),
> null,
> VectorizedColumnBatch.DEFAULT_SIZE,
> formatOptions.get(UTC_TIMEZONE),
> true);
> }
>  
> private static Configuration getParquetConfiguration(ReadableConfig options) {
> Configuration conf = new Configuration();
> Properties properties = new Properties();
> ((org.apache.flink.configuration.Configuration) options).addAllToProperties(properties);
> properties.forEach((k, v) -> conf.set(IDENTIFIER + "." + k, v.toString()));
> return conf;
> }
> {code}
>  
> I know that I can add the AWS credential info into core-site.xml or hdfs-site.xml, so that the `ParquetReader` can get the credential, but I think it might not a good practice, especially different flink jobs will use different AWS credential, so I'm thinking if we can combine the default hadoop configuration(static) and the properties from `flink-conf.yaml`(dynamic) during create `ParquetReader`. 
> For example, just like how this PR doing? https://github.com/apache/flink/pull/21130
>  
> BTW,  I'm using Flink 1.15.1 in a standalone cluster to validate the whole process, but I think not only 1.15.1 version meet this problem, and not only access the objects/files from AWS S3 bucket, any other cloud object storage might also meet this problem.
>  
> Besides change the code, is there any other solution can help me to handle this problem? thanks. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)