You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Lijie Wang (Jira)" <ji...@apache.org> on 2022/03/10 08:11:00 UTC
[jira] [Created] (FLINK-26576) The value of 'readerParallelism' passed to ContinuousFileMonitoringFunction is wrong
Lijie Wang created FLINK-26576:
----------------------------------
Summary: The value of 'readerParallelism' passed to ContinuousFileMonitoringFunction is wrong
Key: FLINK-26576
URL: https://issues.apache.org/jira/browse/FLINK-26576
Project: Flink
Issue Type: Bug
Components: API / DataStream
Reporter: Lijie Wang
In [StreamExecutionEnvironment#createFileInput |https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#:~:text=inputFormat%2C%20monitoringMode%2C-,getParallelism(),-%2C%20interval)%3B], the {{env.getParallelism()}} was passed to {{ContinuousFileMonitoringFunction}} as the parallelism of downstream readers. This value is incorrect when the parallelism of the downstream readers is manually configured by the user.
For example, in the test below, *1* will be passed as {{readerParallelism}}, but the actual parallelism of readers is *5*.
{code:java}
// Some comments here
@Test
public void testContinuousFileMonitoringFunction() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
final String fileContent = "line1\n" + "line2\n" + "line3\n" + "line4\n" + "line5\n";
final File file = createTempFile(fileContent);
env.readTextFile(file.getPath()).name("TextSource").setParallelism(5)
.forward()
.addSink(new PrintSinkFunction<>()).setParallelism(5);
env.execute();
}
private File createTempFile(String content) throws IOException {
File tempFile = File.createTempFile("test_contents", "tmp");
tempFile.deleteOnExit();
OutputStreamWriter wrt =
new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8);
wrt.write(content);
wrt.close();
return tempFile;
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)