You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by 淘宝龙安 <re...@gmail.com> on 2019/11/17 05:02:55 UTC

flink OutOfMemoryError: GC overhead limit execeeded

hi, all

  I have a batch job , read data from postgreSQL with jdbc . When the
record count greater than 100,000, then the flink job
occur OutOfMemoryError: GC overhead limit exceeded

The TaskManager memory is 16GB

-yjm 5120 -ytm 16384


[image: image.png]


my config is .  anybody can help me ?

JDBCInputFormat.buildJDBCInputFormat()
        .setDBUrl(configDO.getJDBCUrl())
        .setUsername(configDO.getDbUser())
        .setPassword(configDO.getDbPassword())
        .setFetchSize(JobConfig.jdbcFetchSize)
        .setDrivername(configDO.getJdbcDriver())
        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
        .setQuery(sql)
        .finish()

Re: flink OutOfMemoryError: GC overhead limit execeeded

Posted by vino yang <ya...@gmail.com>.
Hi 龙安,

Sorry, I did not know you used Blink planner.

> Before use NumericBetweenParametersProvider, the job read data from
database just use one Task manager even i have more then one TM .

About using only one TM, it seems it's a known issue[1].

Best,
Vino

[1]: https://issues.apache.org/jira/browse/FLINK-12122

淘宝龙安 <re...@gmail.com> 于2019年11月19日周二 下午9:26写道:

> 1 )  In flink 1.9.0, I found that  BatchTableSource is Deprecated. Then i
> found  InputFormatTableSource who's function isBounded decide the job is
> batch.
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/common.html#main-differences-between-the-two-planners
>
> In this document above, it says Blink treats batch jobs as a special case
> of streaming.
>
> So I guess  stream env is same as BatchTableEnvironment. And it works
> fine. I don't know if it is correct or not.
>
> 2 )  yes, I made a mistake. When I corrected it, it still doesn't
> work like the past. my new startup commond is   /app/flink-1.9.1/bin/flink
> run -m yarn-cluster -p 8 -ys 4  -yjm 5120 -ytm 16384  my-flink-job.jar
>
> And finally I solved this problem by  NumericBetweenParametersProvider
>
>             jdbcBuilder.setParametersProvider(new
> NumericBetweenParametersProvider(JobConfig.jdbcFetchSize,
>                     configDO.getBatchStart(),
>                     configDO.getBatchEnd()));
>
> But I don't know why ?
>
> Before use NumericBetweenParametersProvider, the job read data from
> database just use one Task manager even i have more then one TM .
>
>
> vino yang <ya...@gmail.com> 于2019年11月19日周二 下午4:16写道:
>
>> Hi 龙安,
>>
>> Firstly, I have two questions.
>>
>> 1) You said this is a batch job, while you used stream env and stream
>> table env.
>>
>> 2) From the startup command, I saw the "-yn" config option which is not
>> supported since Flink 1.8+. I guess you only started one TM
>> container(-p/-s=1). If I am wrong, please correct me.
>>
>> Can we firstly align these two questions?
>>
>> Best,
>> Vino
>>
>>
>> 淘宝龙安 <re...@gmail.com> 于2019年11月19日周二 下午1:32写道:
>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> [jar:file:/data4/HDATA/yarn/local/usercache/flink/appcache/application_1573355697642_32093/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> [jar:file:/data1/app/hadoop-2.7.3-snappy-32core12disk/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> Exception in thread "I/O dispatcher 3" java.lang.OutOfMemoryError: GC
>>> overhead limit exceeded
>>> at java.util.HashMap$KeySet.iterator(HashMap.java:917)
>>> at java.util.HashSet.iterator(HashSet.java:173)
>>> at
>>> java.util.Collections$UnmodifiableCollection$1.<init>(Collections.java:1039)
>>> at
>>> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Exception in thread "I/O dispatcher 28" Exception in thread "I/O
>>> dispatcher 32" java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> Exception in thread "I/O dispatcher 24" java.lang.OutOfMemoryError: GC
>>> overhead limit exceeded
>>> Exception in thread "I/O dispatcher 26" java.lang.OutOfMemoryError: GC
>>> overhead limit exceeded
>>> Exception in thread "I/O dispatcher 12" Exception in thread "I/O
>>> dispatcher 17" java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> at
>>> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Exception in thread "I/O dispatcher 18" java.lang.OutOfMemoryError: GC
>>> overhead limit exceeded
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>> at
>>> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
>>> at
>>> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
>>> at
>>> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
>>> at java.lang.Thread.run(Thread.java:748)
>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>
>>> 淘宝龙安 <re...@gmail.com> 于2019年11月19日周二 上午11:04写道:
>>>
>>>> hi, yanghua
>>>>
>>>> Thanks for your response , my scenario is very simple.
>>>>
>>>> I have two table in database.
>>>>
>>>> table A (user_info)
>>>> --------------------------------
>>>> id       |   varchar    |
>>>> name |  varchar     |
>>>> age    |  numeric    |
>>>> --------------------------------
>>>>
>>>>
>>>>
>>>> Table B (order_info)
>>>> -------------------------------
>>>> id           |   varchar    |
>>>> user_id  |   varchar    |
>>>> price      |   numeric   |
>>>> ------------------------------
>>>>
>>>>
>>>>
>>>>
>>>> I read these two table into flink by JDBC.   Then i join them.  my code
>>>> is just like this .
>>>>
>>>>           EnvironmentSettings fsSettings =
>>>> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
>>>>          StreamExecutionEnvironment fsEnv =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>           StreamTableEnvironment fsTableEnv =
>>>> StreamTableEnvironment.create(fsEnv, fsSettings);
>>>>
>>>>         // register table user_info with  JDBCInputFormat
>>>>         fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
>>>>                 fieldNames,
>>>>                 types,
>>>>                 JDBCInputFormat.buildJDBCInputFormat()
>>>>                         .setDBUrl(configDO.getJDBCUrl())
>>>>                         .setUsername(configDO.getDbUser())
>>>>                         .setPassword(configDO.getDbPassword())
>>>>                         .setFetchSize(10000)
>>>>                         .setDrivername(configDO.getJdbcDriver())
>>>>                         .setRowTypeInfo(new RowTypeInfo(types,
>>>> fieldNames))
>>>>                         .setQuery("select id, name, age from user_info")
>>>>                         .finish()
>>>>         ));
>>>>
>>>>    // register table order_info with JDBCInputFormat
>>>>      fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
>>>>                 fieldNames,
>>>>                 types,
>>>>                 JDBCInputFormat.buildJDBCInputFormat()
>>>>                         .setDBUrl(configDO.getJDBCUrl())
>>>>                         .setUsername(configDO.getDbUser())
>>>>                         .setPassword(configDO.getDbPassword())
>>>>                         .setFetchSize(10000)
>>>>                         .setDrivername(configDO.getJdbcDriver())
>>>>                         .setRowTypeInfo(new RowTypeInfo(types,
>>>> fieldNames))
>>>>                         .setQuery("select id, name, age from
>>>> order_info")
>>>>                         .finish()
>>>>         ));
>>>>
>>>>
>>>>  //register a elasticsearch sink
>>>>   fsTableEnv.registerTableSink("output_table",
>>>>                 new
>>>> ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
>>>>                         .hosts(configDO.getElasticSearchServer())
>>>>
>>>> .index(ElasticsearchConfigUtils.getIndex(configDO, dateTime))
>>>>                         .docType("_doc")
>>>>
>>>> .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))
>>>>
>>>> .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
>>>>                         .build());
>>>>
>>>>  // then join these two table
>>>>         fsTableEnv.sqlUpdate("insert into output_table select
>>>> user_info.id as user_id, order_info.id as order_id, user_info.name,
>>>> order_info.price from user_info join order_info on order_info.user_id =
>>>> user_info.id ");
>>>>         fsEnv.execute(taskName);
>>>>
>>>>
>>>>
>>>> Then i run it on yarn cluster.
>>>>
>>>>  /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm
>>>> 5120 -ytm 16384  my-flink-job.jar
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> vino yang <ya...@gmail.com> 于2019年11月18日周一 上午11:47写道:
>>>>
>>>>> Hi longan,
>>>>>
>>>>> Preliminary evaluation, only 10w+ records may not cause OOM. Can you
>>>>> give more details about your job e.g. job graph or business logic(how many
>>>>> and what kinds of operators do you use?), how many TM containers? log files
>>>>> and so on.
>>>>>
>>>>> What's more, besides monitoring memory metrics, you can view memory GC
>>>>> information with two config options provided by Flink itself.[1]
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>> [1]:
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#memory-and-performance-debugging
>>>>>
>>>>> 淘宝龙安 <re...@gmail.com> 于2019年11月17日周日 下午1:03写道:
>>>>>
>>>>>> hi, all
>>>>>>
>>>>>>   I have a batch job , read data from postgreSQL with jdbc . When the
>>>>>> record count greater than 100,000, then the flink job
>>>>>> occur OutOfMemoryError: GC overhead limit exceeded
>>>>>>
>>>>>> The TaskManager memory is 16GB
>>>>>>
>>>>>> -yjm 5120 -ytm 16384
>>>>>>
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>>
>>>>>> my config is .  anybody can help me ?
>>>>>>
>>>>>> JDBCInputFormat.buildJDBCInputFormat()
>>>>>>         .setDBUrl(configDO.getJDBCUrl())
>>>>>>         .setUsername(configDO.getDbUser())
>>>>>>         .setPassword(configDO.getDbPassword())
>>>>>>         .setFetchSize(JobConfig.jdbcFetchSize)
>>>>>>         .setDrivername(configDO.getJdbcDriver())
>>>>>>         .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
>>>>>>         .setQuery(sql)
>>>>>>         .finish()
>>>>>>
>>>>>>

Re: flink OutOfMemoryError: GC overhead limit execeeded

Posted by vino yang <ya...@gmail.com>.
Hi 龙安,

Firstly, I have two questions.

1) You said this is a batch job, while you used stream env and stream table
env.

2) From the startup command, I saw the "-yn" config option which is not
supported since Flink 1.8+. I guess you only started one TM
container(-p/-s=1). If I am wrong, please correct me.

Can we firstly align these two questions?

Best,
Vino


淘宝龙安 <re...@gmail.com> 于2019年11月19日周二 下午1:32写道:

> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/data4/HDATA/yarn/local/usercache/flink/appcache/application_1573355697642_32093/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/data1/app/hadoop-2.7.3-snappy-32core12disk/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "I/O dispatcher 3" java.lang.OutOfMemoryError: GC
> overhead limit exceeded
> at java.util.HashMap$KeySet.iterator(HashMap.java:917)
> at java.util.HashSet.iterator(HashSet.java:173)
> at
> java.util.Collections$UnmodifiableCollection$1.<init>(Collections.java:1039)
> at
> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
> at
> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> at
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "I/O dispatcher 28" Exception in thread "I/O
> dispatcher 32" java.lang.OutOfMemoryError: GC overhead limit exceeded
> Exception in thread "I/O dispatcher 24" java.lang.OutOfMemoryError: GC
> overhead limit exceeded
> Exception in thread "I/O dispatcher 26" java.lang.OutOfMemoryError: GC
> overhead limit exceeded
> Exception in thread "I/O dispatcher 12" Exception in thread "I/O
> dispatcher 17" java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
> at
> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> at
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "I/O dispatcher 18" java.lang.OutOfMemoryError: GC
> overhead limit exceeded
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
> java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
> at
> org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
> at
> org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
> at
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
> at java.lang.Thread.run(Thread.java:748)
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>
> 淘宝龙安 <re...@gmail.com> 于2019年11月19日周二 上午11:04写道:
>
>> hi, yanghua
>>
>> Thanks for your response , my scenario is very simple.
>>
>> I have two table in database.
>>
>> table A (user_info)
>> --------------------------------
>> id       |   varchar    |
>> name |  varchar     |
>> age    |  numeric    |
>> --------------------------------
>>
>>
>>
>> Table B (order_info)
>> -------------------------------
>> id           |   varchar    |
>> user_id  |   varchar    |
>> price      |   numeric   |
>> ------------------------------
>>
>>
>>
>>
>> I read these two table into flink by JDBC.   Then i join them.  my code
>> is just like this .
>>
>>           EnvironmentSettings fsSettings =
>> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
>>          StreamExecutionEnvironment fsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>           StreamTableEnvironment fsTableEnv =
>> StreamTableEnvironment.create(fsEnv, fsSettings);
>>
>>         // register table user_info with  JDBCInputFormat
>>         fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
>>                 fieldNames,
>>                 types,
>>                 JDBCInputFormat.buildJDBCInputFormat()
>>                         .setDBUrl(configDO.getJDBCUrl())
>>                         .setUsername(configDO.getDbUser())
>>                         .setPassword(configDO.getDbPassword())
>>                         .setFetchSize(10000)
>>                         .setDrivername(configDO.getJdbcDriver())
>>                         .setRowTypeInfo(new RowTypeInfo(types,
>> fieldNames))
>>                         .setQuery("select id, name, age from user_info")
>>                         .finish()
>>         ));
>>
>>    // register table order_info with JDBCInputFormat
>>      fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
>>                 fieldNames,
>>                 types,
>>                 JDBCInputFormat.buildJDBCInputFormat()
>>                         .setDBUrl(configDO.getJDBCUrl())
>>                         .setUsername(configDO.getDbUser())
>>                         .setPassword(configDO.getDbPassword())
>>                         .setFetchSize(10000)
>>                         .setDrivername(configDO.getJdbcDriver())
>>                         .setRowTypeInfo(new RowTypeInfo(types,
>> fieldNames))
>>                         .setQuery("select id, name, age from order_info")
>>                         .finish()
>>         ));
>>
>>
>>  //register a elasticsearch sink
>>   fsTableEnv.registerTableSink("output_table",
>>                 new ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
>>                         .hosts(configDO.getElasticSearchServer())
>>
>> .index(ElasticsearchConfigUtils.getIndex(configDO, dateTime))
>>                         .docType("_doc")
>>
>> .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))
>>
>> .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
>>                         .build());
>>
>>  // then join these two table
>>         fsTableEnv.sqlUpdate("insert into output_table select
>> user_info.id as user_id, order_info.id as order_id, user_info.name,
>> order_info.price from user_info join order_info on order_info.user_id =
>> user_info.id ");
>>         fsEnv.execute(taskName);
>>
>>
>>
>> Then i run it on yarn cluster.
>>
>>  /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm 5120
>> -ytm 16384  my-flink-job.jar
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> vino yang <ya...@gmail.com> 于2019年11月18日周一 上午11:47写道:
>>
>>> Hi longan,
>>>
>>> Preliminary evaluation, only 10w+ records may not cause OOM. Can you
>>> give more details about your job e.g. job graph or business logic(how many
>>> and what kinds of operators do you use?), how many TM containers? log files
>>> and so on.
>>>
>>> What's more, besides monitoring memory metrics, you can view memory GC
>>> information with two config options provided by Flink itself.[1]
>>>
>>> Best,
>>> Vino
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#memory-and-performance-debugging
>>>
>>> 淘宝龙安 <re...@gmail.com> 于2019年11月17日周日 下午1:03写道:
>>>
>>>> hi, all
>>>>
>>>>   I have a batch job , read data from postgreSQL with jdbc . When the
>>>> record count greater than 100,000, then the flink job
>>>> occur OutOfMemoryError: GC overhead limit exceeded
>>>>
>>>> The TaskManager memory is 16GB
>>>>
>>>> -yjm 5120 -ytm 16384
>>>>
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>> my config is .  anybody can help me ?
>>>>
>>>> JDBCInputFormat.buildJDBCInputFormat()
>>>>         .setDBUrl(configDO.getJDBCUrl())
>>>>         .setUsername(configDO.getDbUser())
>>>>         .setPassword(configDO.getDbPassword())
>>>>         .setFetchSize(JobConfig.jdbcFetchSize)
>>>>         .setDrivername(configDO.getJdbcDriver())
>>>>         .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
>>>>         .setQuery(sql)
>>>>         .finish()
>>>>
>>>>

Re: flink OutOfMemoryError: GC overhead limit execeeded

Posted by 淘宝龙安 <re...@gmail.com>.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data4/HDATA/yarn/local/usercache/flink/appcache/application_1573355697642_32093/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data1/app/hadoop-2.7.3-snappy-32core12disk/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Exception in thread "I/O dispatcher 3" java.lang.OutOfMemoryError: GC
overhead limit exceeded
at java.util.HashMap$KeySet.iterator(HashMap.java:917)
at java.util.HashSet.iterator(HashSet.java:173)
at
java.util.Collections$UnmodifiableCollection$1.<init>(Collections.java:1039)
at
java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "I/O dispatcher 28" Exception in thread "I/O dispatcher
32" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "I/O dispatcher 24" java.lang.OutOfMemoryError: GC
overhead limit exceeded
Exception in thread "I/O dispatcher 26" java.lang.OutOfMemoryError: GC
overhead limit exceeded
Exception in thread "I/O dispatcher 12" Exception in thread "I/O dispatcher
17" java.lang.OutOfMemoryError: GC overhead limit exceeded
at
java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
Exception in thread "I/O dispatcher 18" java.lang.OutOfMemoryError: GC
overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
java.util.Collections$UnmodifiableCollection.iterator(Collections.java:1038)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:212)
at
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280)
at
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:748)
java.lang.OutOfMemoryError: GC overhead limit exceeded

淘宝龙安 <re...@gmail.com> 于2019年11月19日周二 上午11:04写道:

> hi, yanghua
>
> Thanks for your response , my scenario is very simple.
>
> I have two table in database.
>
> table A (user_info)
> --------------------------------
> id       |   varchar    |
> name |  varchar     |
> age    |  numeric    |
> --------------------------------
>
>
>
> Table B (order_info)
> -------------------------------
> id           |   varchar    |
> user_id  |   varchar    |
> price      |   numeric   |
> ------------------------------
>
>
>
>
> I read these two table into flink by JDBC.   Then i join them.  my code is
> just like this .
>
>           EnvironmentSettings fsSettings =
> EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
>          StreamExecutionEnvironment fsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>           StreamTableEnvironment fsTableEnv =
> StreamTableEnvironment.create(fsEnv, fsSettings);
>
>         // register table user_info with  JDBCInputFormat
>         fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
>                 fieldNames,
>                 types,
>                 JDBCInputFormat.buildJDBCInputFormat()
>                         .setDBUrl(configDO.getJDBCUrl())
>                         .setUsername(configDO.getDbUser())
>                         .setPassword(configDO.getDbPassword())
>                         .setFetchSize(10000)
>                         .setDrivername(configDO.getJdbcDriver())
>                         .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
>                         .setQuery("select id, name, age from user_info")
>                         .finish()
>         ));
>
>    // register table order_info with JDBCInputFormat
>      fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
>                 fieldNames,
>                 types,
>                 JDBCInputFormat.buildJDBCInputFormat()
>                         .setDBUrl(configDO.getJDBCUrl())
>                         .setUsername(configDO.getDbUser())
>                         .setPassword(configDO.getDbPassword())
>                         .setFetchSize(10000)
>                         .setDrivername(configDO.getJdbcDriver())
>                         .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
>                         .setQuery("select id, name, age from order_info")
>                         .finish()
>         ));
>
>
>  //register a elasticsearch sink
>   fsTableEnv.registerTableSink("output_table",
>                 new ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
>                         .hosts(configDO.getElasticSearchServer())
>                         .index(ElasticsearchConfigUtils.getIndex(configDO,
> dateTime))
>                         .docType("_doc")
>
> .fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))
>
> .fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
>                         .build());
>
>  // then join these two table
>         fsTableEnv.sqlUpdate("insert into output_table select user_info.id
> as user_id, order_info.id as order_id, user_info.name, order_info.price
> from user_info join order_info on order_info.user_id = user_info.id ");
>         fsEnv.execute(taskName);
>
>
>
> Then i run it on yarn cluster.
>
>  /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm 5120
> -ytm 16384  my-flink-job.jar
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> vino yang <ya...@gmail.com> 于2019年11月18日周一 上午11:47写道:
>
>> Hi longan,
>>
>> Preliminary evaluation, only 10w+ records may not cause OOM. Can you give
>> more details about your job e.g. job graph or business logic(how many and
>> what kinds of operators do you use?), how many TM containers? log files and
>> so on.
>>
>> What's more, besides monitoring memory metrics, you can view memory GC
>> information with two config options provided by Flink itself.[1]
>>
>> Best,
>> Vino
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#memory-and-performance-debugging
>>
>> 淘宝龙安 <re...@gmail.com> 于2019年11月17日周日 下午1:03写道:
>>
>>> hi, all
>>>
>>>   I have a batch job , read data from postgreSQL with jdbc . When the
>>> record count greater than 100,000, then the flink job
>>> occur OutOfMemoryError: GC overhead limit exceeded
>>>
>>> The TaskManager memory is 16GB
>>>
>>> -yjm 5120 -ytm 16384
>>>
>>>
>>> [image: image.png]
>>>
>>>
>>> my config is .  anybody can help me ?
>>>
>>> JDBCInputFormat.buildJDBCInputFormat()
>>>         .setDBUrl(configDO.getJDBCUrl())
>>>         .setUsername(configDO.getDbUser())
>>>         .setPassword(configDO.getDbPassword())
>>>         .setFetchSize(JobConfig.jdbcFetchSize)
>>>         .setDrivername(configDO.getJdbcDriver())
>>>         .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
>>>         .setQuery(sql)
>>>         .finish()
>>>
>>>

Re: flink OutOfMemoryError: GC overhead limit execeeded

Posted by 淘宝龙安 <re...@gmail.com>.
hi, yanghua

Thanks for your response , my scenario is very simple.

I have two table in database.

table A (user_info)
--------------------------------
id       |   varchar    |
name |  varchar     |
age    |  numeric    |
--------------------------------



Table B (order_info)
-------------------------------
id           |   varchar    |
user_id  |   varchar    |
price      |   numeric   |
------------------------------




I read these two table into flink by JDBC.   Then i join them.  my code is
just like this .

          EnvironmentSettings fsSettings =
EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
         StreamExecutionEnvironment fsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
          StreamTableEnvironment fsTableEnv =
StreamTableEnvironment.create(fsEnv, fsSettings);

        // register table user_info with  JDBCInputFormat
        fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from user_info")
                        .finish()
        ));

   // register table order_info with JDBCInputFormat
     fsTableEnv.registerTableSource("user_info", new JdbcTableSource(
                fieldNames,
                types,
                JDBCInputFormat.buildJDBCInputFormat()
                        .setDBUrl(configDO.getJDBCUrl())
                        .setUsername(configDO.getDbUser())
                        .setPassword(configDO.getDbPassword())
                        .setFetchSize(10000)
                        .setDrivername(configDO.getJdbcDriver())
                        .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
                        .setQuery("select id, name, age from order_info")
                        .finish()
        ));


 //register a elasticsearch sink
  fsTableEnv.registerTableSink("output_table",
                new ElasticSearchTableSink.ElasticSearchTableSinkBuilder()
                        .hosts(configDO.getElasticSearchServer())
                        .index(ElasticsearchConfigUtils.getIndex(configDO,
dateTime))
                        .docType("_doc")

.fieldNames(DatabaseTypesUtils.getFieldNames(configDO.getOutput()))

.fieldTypes(DatabaseTypesUtils.getTableTypes(configDO.getOutput()))
                        .build());

 // then join these two table
        fsTableEnv.sqlUpdate("insert into output_table select user_info.id
as user_id, order_info.id as order_id, user_info.name, order_info.price
from user_info join order_info on order_info.user_id = user_info.id ");
        fsEnv.execute(taskName);



Then i run it on yarn cluster.

 /app/flink-1.9.1/bin/flink run -m yarn-cluster -p 4 -ys 4-yn 4 -yjm 5120
-ytm 16384  my-flink-job.jar

















vino yang <ya...@gmail.com> 于2019年11月18日周一 上午11:47写道:

> Hi longan,
>
> Preliminary evaluation, only 10w+ records may not cause OOM. Can you give
> more details about your job e.g. job graph or business logic(how many and
> what kinds of operators do you use?), how many TM containers? log files and
> so on.
>
> What's more, besides monitoring memory metrics, you can view memory GC
> information with two config options provided by Flink itself.[1]
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#memory-and-performance-debugging
>
> 淘宝龙安 <re...@gmail.com> 于2019年11月17日周日 下午1:03写道:
>
>> hi, all
>>
>>   I have a batch job , read data from postgreSQL with jdbc . When the
>> record count greater than 100,000, then the flink job
>> occur OutOfMemoryError: GC overhead limit exceeded
>>
>> The TaskManager memory is 16GB
>>
>> -yjm 5120 -ytm 16384
>>
>>
>> [image: image.png]
>>
>>
>> my config is .  anybody can help me ?
>>
>> JDBCInputFormat.buildJDBCInputFormat()
>>         .setDBUrl(configDO.getJDBCUrl())
>>         .setUsername(configDO.getDbUser())
>>         .setPassword(configDO.getDbPassword())
>>         .setFetchSize(JobConfig.jdbcFetchSize)
>>         .setDrivername(configDO.getJdbcDriver())
>>         .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
>>         .setQuery(sql)
>>         .finish()
>>
>>

Re: flink OutOfMemoryError: GC overhead limit execeeded

Posted by vino yang <ya...@gmail.com>.
Hi longan,

Preliminary evaluation, only 10w+ records may not cause OOM. Can you give
more details about your job e.g. job graph or business logic(how many and
what kinds of operators do you use?), how many TM containers? log files and
so on.

What's more, besides monitoring memory metrics, you can view memory GC
information with two config options provided by Flink itself.[1]

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.1/setup/config.html#memory-and-performance-debugging

淘宝龙安 <re...@gmail.com> 于2019年11月17日周日 下午1:03写道:

> hi, all
>
>   I have a batch job , read data from postgreSQL with jdbc . When the
> record count greater than 100,000, then the flink job
> occur OutOfMemoryError: GC overhead limit exceeded
>
> The TaskManager memory is 16GB
>
> -yjm 5120 -ytm 16384
>
>
> [image: image.png]
>
>
> my config is .  anybody can help me ?
>
> JDBCInputFormat.buildJDBCInputFormat()
>         .setDBUrl(configDO.getJDBCUrl())
>         .setUsername(configDO.getDbUser())
>         .setPassword(configDO.getDbPassword())
>         .setFetchSize(JobConfig.jdbcFetchSize)
>         .setDrivername(configDO.getJdbcDriver())
>         .setRowTypeInfo(new RowTypeInfo(types, fieldNames))
>         .setQuery(sql)
>         .finish()
>
>