You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2022/01/19 12:16:00 UTC

[jira] [Created] (FLINK-25703) In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames

shizhengchao created FLINK-25703:
------------------------------------

             Summary: In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames
                 Key: FLINK-25703
                 URL: https://issues.apache.org/jira/browse/FLINK-25703
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.14.3
            Reporter: shizhengchao


If the HADOOP_USER_NAME set on the client side is user_a, but when the task is running in the TaskManager is user_b(this is achievable), then the Batch task will cause permission problems, as follows:

 
{code:java}
//代码占位符
Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=user_b, access=WRITE, inode="/where/to/whatever/.staging__1642575264185":user_a:supergroup:drwxr-xr-x{code}
because the ./staging__1642575264185 is created by user_a。

 

So we should move the code that creates the .staging directory into the FileSystemOutputFormat#open method :
{code:java}
//代码占位符
@Override
public void open(int taskNumber, int numTasks) throws IOException {
    try {
        // check temppath and create it
        checkOrCreateTmpPath();
        PartitionTempFileManager fileManager =
                new PartitionTempFileManager(
                        fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig);
        PartitionWriter.Context<T> context =
                new PartitionWriter.Context<>(parameters, formatFactory);
        writer =
                PartitionWriterFactory.<T>get(
                                partitionColumns.length - staticPartitions.size() > 0,
                                dynamicGrouped,
                                staticPartitions)
                        .create(context, fileManager, computer);
    } catch (Exception e) {
        throw new TableException("Exception in open", e);
    }
}

private void checkOrCreateTmpPath() throws Exception {
    FileSystem fs = tmpPath.getFileSystem();
    Preconditions.checkState(fs.exists(tmpPath) || fs.mkdirs(tmpPath),
        "Failed to create staging dir " + tmpPath);
}
 {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)