You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Whelan <sw...@jwplayer.com> on 2020/08/11 02:48:13 UTC

S3 file source parallelism reverting to 1

Hi,

I have an S3 file source being consumed
as FileProcessingMode.PROCESS_CONTINUOUSLY with a parallelism of 3. I can
confirm the parallelism is set by printing it out. However, in the UI, the
file source has a parallelism of 1. I'm not changing it after its being
initially set.


DataStream s = env.readFile(
            new JsonInputFormat(...),
            filePath,
            FileProcessingMode.PROCESS_CONTINUOUSLY,
            5,
            myTypeInformation)
            .setParallelism(3);
System.out.println(s.getParallelism());  // prints 3


The DataStreamSource is a parallel operator otherwise `setParallelism(3)`
would throw an `IllegalArgumentException`[1]. The only other thing I do
with the DataStream is register it with the TableEnvironment.


tableEnvironment.registerDataStream("my_table", dataStream);


Is Flink resetting it to 1 for some reason? I'm running v1.9.0.

Thanks,

Steve


[1]
https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java#L53

Re: S3 file source parallelism reverting to 1

Posted by Dmytro Dragan <dd...@softserveinc.com>.
Hi Steve,

When you call env.readFile(…), internally env creates:

ContinuousFileMonitoringFunction<OUT> monitoringFunction =
   new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);

ContinuousFileReaderOperator<OUT> reader =
   new ContinuousFileReaderOperator<>(inputFormat);

SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName)
      .transform("Split Reader: " + sourceName, typeInfo, reader);

return new DataStreamSource<>(source);

ContinuousFileMonitoringFunction is RichSource function (not parallel), so it has parallelism 1.

On each execution (controlled by monitor parameter), it builds recursive tree of all new file splits which are passed to ContinuousFileReaderOperator.
ContinuousFileReaderOperator in parallel process each filesplit.



From: Steve Whelan <sw...@jwplayer.com>
Date: Tuesday, 11 August 2020 at 04:48
To: user <us...@flink.apache.org>
Subject: S3 file source parallelism reverting to 1

Hi,

I have an S3 file source being consumed as FileProcessingMode.PROCESS_CONTINUOUSLY with a parallelism of 3. I can confirm the parallelism is set by printing it out. However, in the UI, the file source has a parallelism of 1. I'm not changing it after its being initially set.


DataStream s = env.readFile(
            new JsonInputFormat(...),
            filePath,
            FileProcessingMode.PROCESS_CONTINUOUSLY,
            5,
            myTypeInformation)
            .setParallelism(3);
System.out.println(s.getParallelism());  // prints 3


The DataStreamSource is a parallel operator otherwise `setParallelism(3)` would throw an `IllegalArgumentException`[1]. The only other thing I do with the DataStream is register it with the TableEnvironment.


tableEnvironment.registerDataStream("my_table", dataStream);


Is Flink resetting it to 1 for some reason? I'm running v1.9.0.

Thanks,

Steve


[1] https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java#L53