You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/01/19 13:03:00 UTC
[jira] [Updated] (FLINK-25703) In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames
[ https://issues.apache.org/jira/browse/FLINK-25703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser updated FLINK-25703:
-----------------------------------
Component/s: Connectors / FileSystem
FileSystems
> In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames
> --------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-25703
> URL: https://issues.apache.org/jira/browse/FLINK-25703
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem, FileSystems, Table SQL / Runtime
> Affects Versions: 1.14.3
> Reporter: shizhengchao
> Priority: Major
>
> If the HADOOP_USER_NAME set on the client side is user_a, but when the task is running in the TaskManager is user_b(this is achievable), then the Batch task will cause permission problems, as follows:
>
> {code:java}
> //代码占位符
> Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=user_b, access=WRITE, inode="/where/to/whatever/.staging__1642575264185":user_a:supergroup:drwxr-xr-x{code}
> because the ./staging__1642575264185 is created by user_a。
>
> So we should move the code that creates the .staging directory into the FileSystemOutputFormat#open method :
> {code:java}
> //代码占位符
> @Override
> public void open(int taskNumber, int numTasks) throws IOException {
> try {
> // check temppath and create it
> checkOrCreateTmpPath();
> PartitionTempFileManager fileManager =
> new PartitionTempFileManager(
> fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig);
> PartitionWriter.Context<T> context =
> new PartitionWriter.Context<>(parameters, formatFactory);
> writer =
> PartitionWriterFactory.<T>get(
> partitionColumns.length - staticPartitions.size() > 0,
> dynamicGrouped,
> staticPartitions)
> .create(context, fileManager, computer);
> } catch (Exception e) {
> throw new TableException("Exception in open", e);
> }
> }
> private void checkOrCreateTmpPath() throws Exception {
> FileSystem fs = tmpPath.getFileSystem();
> Preconditions.checkState(fs.exists(tmpPath) || fs.mkdirs(tmpPath),
> "Failed to create staging dir " + tmpPath);
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)