You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2022/01/19 12:16:00 UTC
[jira] [Created] (FLINK-25703) In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames
shizhengchao created FLINK-25703:
------------------------------------
Summary: In Batch, I think .stagingPath should be created when the task is running to prevent permission issues caused by inconsistency of hadoop usernames
Key: FLINK-25703
URL: https://issues.apache.org/jira/browse/FLINK-25703
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.14.3
Reporter: shizhengchao
If the HADOOP_USER_NAME set on the client side is user_a, but when the task is running in the TaskManager is user_b(this is achievable), then the Batch task will cause permission problems, as follows:
{code:java}
//代码占位符
Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=user_b, access=WRITE, inode="/where/to/whatever/.staging__1642575264185":user_a:supergroup:drwxr-xr-x{code}
because the ./staging__1642575264185 is created by user_a。
So we should move the code that creates the .staging directory into the FileSystemOutputFormat#open method :
{code:java}
//代码占位符
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
// check temppath and create it
checkOrCreateTmpPath();
PartitionTempFileManager fileManager =
new PartitionTempFileManager(
fsFactory, tmpPath, taskNumber, CHECKPOINT_ID, outputFileConfig);
PartitionWriter.Context<T> context =
new PartitionWriter.Context<>(parameters, formatFactory);
writer =
PartitionWriterFactory.<T>get(
partitionColumns.length - staticPartitions.size() > 0,
dynamicGrouped,
staticPartitions)
.create(context, fileManager, computer);
} catch (Exception e) {
throw new TableException("Exception in open", e);
}
}
private void checkOrCreateTmpPath() throws Exception {
FileSystem fs = tmpPath.getFileSystem();
Preconditions.checkState(fs.exists(tmpPath) || fs.mkdirs(tmpPath),
"Failed to create staging dir " + tmpPath);
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)