You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Arthur Li <li...@126.com> on 2022/05/01 01:14:04 UTC
Flink 1.14.4 HybridSource consumes lots of CPU resources
Hi all,
the Hybrid Source | Apache Flink <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> is one of new features of Flink 1.14.x, but one problem is it takes over 700% CPU which is almost 5 times than these two splits.
My Environment:
JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
Scala: Scala code runner version 2.12.14
OS: MacOS Monterey
Hybrid Source Code:
object HelloHybrid {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafka =
KafkaSource.builder[String]()
.setBootstrapServers("localhost:9092")
.setTopics("lab-flink-sensor-iot")
.setGroupId("sensor-iot-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
val sensorDataFile = "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
val fileData = FileSource.forRecordStreamFormat(
new TextLineFormat(),
Path.fromLocalFile(new File(sensorDataFile)))
.build()
val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
env.fromSource(hybridSrc,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
"kafka & file hybrid source")
.map(data => {
val arr = data.split(",").map(_.trim)
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
})
.print("hybrid")
env.execute("Hello kafka & file hybrid source")
}
}
Re: Flink 1.14.4 HybridSource consumes lots of CPU resources
Posted by Thomas Weise <th...@apache.org>.
Thank you for reporting the issue. Mason has already identified the root
cause and the JIRA is now assigned to him:
https://issues.apache.org/jira/browse/FLINK-27479
Thomas
On Tue, May 3, 2022 at 4:02 AM Martijn Visser <ma...@apache.org>
wrote:
> I'm looping in @Thomas Weise <th...@apache.org> since he has expertise on
> the HybridSource.
>
> On Tue, 3 May 2022 at 12:04, Arthur Li <li...@126.com> wrote:
>
>> Hi Mason,
>>
>> I upload the code and resource files to AwesomeArthurLi/quickstart:
>> quickstart (github.com) <https://github.com/AwesomeArthurLi/quickstart>,
>> may it will help you reproduce the issue.
>>
>> BR.
>> Arthur Li
>>
>>
>> 2022年5月3日 15:48,Mason Chen <ma...@gmail.com> 写道:
>>
>> Hi Arthur,
>>
>> Coincidentally, I also encountered a similar issue recently. For my
>> issue, I noticed that the source implementation always marks itself as
>> having data available causing the Flink runtime to repeatedly loop in
>> succession and causing high CPU utilization. More details in here:
>> https://issues.apache.org/jira/browse/FLINK-27479
>>
>> Can you provide a minimal working example to reproduce this issue? I
>> presume you notice high CPU utilization before switching from FileSource
>> and also after switching to KafkaSource?
>>
>> Best,
>> Mason
>>
>> On Sun, May 1, 2022 at 6:24 AM Arthur Li <li...@126.com> wrote:
>>
>>> Following snapshot is the java process’s frame graph.
>>>
>>> <粘贴的图形-1.png>
>>>
>>>
>>> 2022年5月1日 09:14,Arthur Li <li...@126.com> 写道:
>>>
>>> Hi all,
>>>
>>> the Hybrid Source | Apache Flink
>>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> is
>>> one of new features of Flink 1.14.x, but one problem is it takes over*
>>> 700% CPU* which is almost 5 times than these two splits.
>>>
>>>
>>> My Environment:
>>> JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>>> Scala: Scala code runner version 2.12.14
>>> OS: MacOS Monterey
>>>
>>>
>>> Hybrid Source Code:
>>>
>>> object HelloHybrid {
>>>
>>> def main(args: Array[String]): Unit = {
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val kafka =
>>> KafkaSource.builder[String]()
>>> .setBootstrapServers("localhost:9092")
>>> .setTopics("lab-flink-sensor-iot")
>>> .setGroupId("sensor-iot-group")
>>> .setStartingOffsets(OffsetsInitializer.earliest())
>>> .setValueOnlyDeserializer(new SimpleStringSchema())
>>> .build()
>>>
>>> val sensorDataFile = "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>>> val fileData = FileSource.forRecordStreamFormat(
>>> new TextLineFormat(),
>>> Path.fromLocalFile(new File(sensorDataFile)))
>>> .build()
>>>
>>> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>>>
>>> env.fromSource(hybridSrc,
>>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>>> "kafka & file hybrid source")
>>> .map(data => {
>>> val arr = data.split(",").map(_.trim)
>>> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>>> })
>>> .print("hybrid")
>>>
>>> env.execute("Hello kafka & file hybrid source")
>>> }
>>> }
>>>
>>>
>>>
>>>
>>>
>>
Re: Flink 1.14.4 HybridSource consumes lots of CPU resources
Posted by Martijn Visser <ma...@apache.org>.
I'm looping in @Thomas Weise <th...@apache.org> since he has expertise on the
HybridSource.
On Tue, 3 May 2022 at 12:04, Arthur Li <li...@126.com> wrote:
> Hi Mason,
>
> I upload the code and resource files to AwesomeArthurLi/quickstart:
> quickstart (github.com) <https://github.com/AwesomeArthurLi/quickstart>,
> may it will help you reproduce the issue.
>
> BR.
> Arthur Li
>
>
> 2022年5月3日 15:48,Mason Chen <ma...@gmail.com> 写道:
>
> Hi Arthur,
>
> Coincidentally, I also encountered a similar issue recently. For my issue,
> I noticed that the source implementation always marks itself as having data
> available causing the Flink runtime to repeatedly loop in succession
> and causing high CPU utilization. More details in here:
> https://issues.apache.org/jira/browse/FLINK-27479
>
> Can you provide a minimal working example to reproduce this issue? I
> presume you notice high CPU utilization before switching from FileSource
> and also after switching to KafkaSource?
>
> Best,
> Mason
>
> On Sun, May 1, 2022 at 6:24 AM Arthur Li <li...@126.com> wrote:
>
>> Following snapshot is the java process’s frame graph.
>>
>> <粘贴的图形-1.png>
>>
>>
>> 2022年5月1日 09:14,Arthur Li <li...@126.com> 写道:
>>
>> Hi all,
>>
>> the Hybrid Source | Apache Flink
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> is
>> one of new features of Flink 1.14.x, but one problem is it takes over*
>> 700% CPU* which is almost 5 times than these two splits.
>>
>>
>> My Environment:
>> JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>> Scala: Scala code runner version 2.12.14
>> OS: MacOS Monterey
>>
>>
>> Hybrid Source Code:
>>
>> object HelloHybrid {
>>
>> def main(args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val kafka =
>> KafkaSource.builder[String]()
>> .setBootstrapServers("localhost:9092")
>> .setTopics("lab-flink-sensor-iot")
>> .setGroupId("sensor-iot-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build()
>>
>> val sensorDataFile = "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>> val fileData = FileSource.forRecordStreamFormat(
>> new TextLineFormat(),
>> Path.fromLocalFile(new File(sensorDataFile)))
>> .build()
>>
>> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>>
>> env.fromSource(hybridSrc,
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>> "kafka & file hybrid source")
>> .map(data => {
>> val arr = data.split(",").map(_.trim)
>> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>> })
>> .print("hybrid")
>>
>> env.execute("Hello kafka & file hybrid source")
>> }
>> }
>>
>>
>>
>>
>>
>
Re: Flink 1.14.4 HybridSource consumes lots of CPU resources
Posted by Arthur Li <li...@126.com>.
Hi Mason,
I upload the code and resource files to AwesomeArthurLi/quickstart: quickstart (github.com) <https://github.com/AwesomeArthurLi/quickstart>, may it will help you reproduce the issue.
BR.
Arthur Li
> 2022年5月3日 15:48,Mason Chen <ma...@gmail.com> 写道:
>
> Hi Arthur,
>
> Coincidentally, I also encountered a similar issue recently. For my issue, I noticed that the source implementation always marks itself as having data available causing the Flink runtime to repeatedly loop in succession and causing high CPU utilization. More details in here: https://issues.apache.org/jira/browse/FLINK-27479 <https://issues.apache.org/jira/browse/FLINK-27479>
>
> Can you provide a minimal working example to reproduce this issue? I presume you notice high CPU utilization before switching from FileSource and also after switching to KafkaSource?
>
> Best,
> Mason
>
> On Sun, May 1, 2022 at 6:24 AM Arthur Li <lianyou1225@126.com <ma...@126.com>> wrote:
> Following snapshot is the java process’s frame graph.
>
> <粘贴的图形-1.png>
>
>
>> 2022年5月1日 09:14,Arthur Li <lianyou1225@126.com <ma...@126.com>> 写道:
>>
>> Hi all,
>>
>> the Hybrid Source | Apache Flink <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> is one of new features of Flink 1.14.x, but one problem is it takes over 700% CPU which is almost 5 times than these two splits.
>>
>>
>> My Environment:
>> JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>> Scala: Scala code runner version 2.12.14
>> OS: MacOS Monterey
>>
>>
>> Hybrid Source Code:
>>
>> object HelloHybrid {
>> def main(args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val kafka =
>> KafkaSource.builder[String]()
>> .setBootstrapServers("localhost:9092")
>> .setTopics("lab-flink-sensor-iot")
>> .setGroupId("sensor-iot-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build()
>>
>> val sensorDataFile = "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>> val fileData = FileSource.forRecordStreamFormat(
>> new TextLineFormat(),
>> Path.fromLocalFile(new File(sensorDataFile)))
>> .build()
>>
>> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>>
>> env.fromSource(hybridSrc,
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>> "kafka & file hybrid source")
>> .map(data => {
>> val arr = data.split(",").map(_.trim)
>> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>> })
>> .print("hybrid")
>>
>> env.execute("Hello kafka & file hybrid source")
>> }
>> }
>>
>>
>
Re: Flink 1.14.4 HybridSource consumes lots of CPU resources
Posted by Mason Chen <ma...@gmail.com>.
Hi Arthur,
Coincidentally, I also encountered a similar issue recently. For my issue,
I noticed that the source implementation always marks itself as having data
available causing the Flink runtime to repeatedly loop in succession
and causing high CPU utilization. More details in here:
https://issues.apache.org/jira/browse/FLINK-27479
Can you provide a minimal working example to reproduce this issue? I
presume you notice high CPU utilization before switching from FileSource
and also after switching to KafkaSource?
Best,
Mason
On Sun, May 1, 2022 at 6:24 AM Arthur Li <li...@126.com> wrote:
> Following snapshot is the java process’s frame graph.
>
>
>
> 2022年5月1日 09:14,Arthur Li <li...@126.com> 写道:
>
> Hi all,
>
> the Hybrid Source | Apache Flink
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> is
> one of new features of Flink 1.14.x, but one problem is it takes over*
> 700% CPU* which is almost 5 times than these two splits.
>
>
> My Environment:
> JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
> Scala: Scala code runner version 2.12.14
> OS: MacOS Monterey
>
>
> Hybrid Source Code:
>
> object HelloHybrid {
>
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val kafka =
> KafkaSource.builder[String]()
> .setBootstrapServers("localhost:9092")
> .setTopics("lab-flink-sensor-iot")
> .setGroupId("sensor-iot-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build()
>
> val sensorDataFile = "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
> val fileData = FileSource.forRecordStreamFormat(
> new TextLineFormat(),
> Path.fromLocalFile(new File(sensorDataFile)))
> .build()
>
> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>
> env.fromSource(hybridSrc,
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
> "kafka & file hybrid source")
> .map(data => {
> val arr = data.split(",").map(_.trim)
> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
> })
> .print("hybrid")
>
> env.execute("Hello kafka & file hybrid source")
> }
> }
>
>
>
>
>
Re: Flink 1.14.4 HybridSource consumes lots of CPU resources
Posted by Arthur Li <li...@126.com>.
Following snapshot is the java process’s frame graph.
> 2022年5月1日 09:14,Arthur Li <li...@126.com> 写道:
>
> Hi all,
>
> the Hybrid Source | Apache Flink <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source> is one of new features of Flink 1.14.x, but one problem is it takes over 700% CPU which is almost 5 times than these two splits.
>
>
> My Environment:
> JDK: 11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
> Scala: Scala code runner version 2.12.14
> OS: MacOS Monterey
>
>
> Hybrid Source Code:
>
> object HelloHybrid {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val kafka =
> KafkaSource.builder[String]()
> .setBootstrapServers("localhost:9092")
> .setTopics("lab-flink-sensor-iot")
> .setGroupId("sensor-iot-group")
> .setStartingOffsets(OffsetsInitializer.earliest())
> .setValueOnlyDeserializer(new SimpleStringSchema())
> .build()
>
> val sensorDataFile = "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
> val fileData = FileSource.forRecordStreamFormat(
> new TextLineFormat(),
> Path.fromLocalFile(new File(sensorDataFile)))
> .build()
>
> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>
> env.fromSource(hybridSrc,
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
> "kafka & file hybrid source")
> .map(data => {
> val arr = data.split(",").map(_.trim)
> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
> })
> .print("hybrid")
>
> env.execute("Hello kafka & file hybrid source")
> }
> }
>
>