You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/01/19 13:03:00 UTC

[jira] [Updated] (FLINK-25703) In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames

     [ https://issues.apache.org/jira/browse/FLINK-25703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Martijn Visser updated FLINK-25703:
-----------------------------------
    Component/s: Connectors / FileSystem
                 FileSystems

> In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames
> --------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25703
>                 URL: https://issues.apache.org/jira/browse/FLINK-25703
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem, FileSystems, Table SQL / Runtime
>    Affects Versions: 1.14.3
>            Reporter: shizhengchao
>            Priority: Major
>
> If the HADOOP_USER_NAME set on the client side is user_a, but when the task is running in the TaskManager is user_b(this is achievable), then the Batch task will cause permission problems, as follows:
>  
> {code:java}
> //代码占位符
> Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=user_b, access=WRITE, inode="/where/to/whatever/.staging__1642575264185":user_a:supergroup:drwxr-xr-x{code}
> because the ./staging__1642575264185 is created by user_a。
>  
> So we should move the code that creates the .staging directory into the FileSystemOutputFormat#open method :
> {code:java}
> //代码占位符
> @Override
> public void open(int taskNumber, int numTasks) throws IOException {
>     try {
>         // check temppath and create it
>         checkOrCreateTmpPath();
>         PartitionTempFileManager fileManager =
>                 new PartitionTempFileManager(
>                         fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig);
>         PartitionWriter.Context<T> context =
>                 new PartitionWriter.Context<>(parameters, formatFactory);
>         writer =
>                 PartitionWriterFactory.<T>get(
>                                 partitionColumns.length - staticPartitions.size() > 0,
>                                 dynamicGrouped,
>                                 staticPartitions)
>                         .create(context, fileManager, computer);
>     } catch (Exception e) {
>         throw new TableException("Exception in open", e);
>     }
> }
> private void checkOrCreateTmpPath() throws Exception {
>     FileSystem fs = tmpPath.getFileSystem();
>     Preconditions.checkState(fs.exists(tmpPath) || fs.mkdirs(tmpPath),
>         "Failed to create staging dir " + tmpPath);
> }
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)