You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by PedroMrChaves <pe...@gmail.com> on 2017/07/19 18:23:36 UTC

FlinkKafkaConsumer010 - Memory Issue

Hello,

Whenever I submit a job to Flink that retrieves data from Kafka the memory
consumption continuously increases. I've changed the max heap memory from
2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the
limit. 

An example of a simple Job that shows this behavior is depicted bellow.  

/          /*
             * Execution Environment Setup
             */
            final StreamExecutionEnvironment environment =
getGlobalJobConfiguration(configDir, configurations);

            /**
             * Collect event data from Kafka
             */
            DataStreamSource<String> s = environment.addSource(new
FlinkKafkaConsumer010<String>(
                    configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC), 
                    new SimpleStringSchema(),
                    getKafkaConfiguration(configurations)));
            
            s.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    return false;
                }
            }).print();

private static Properties getKafkaConfiguration(ParameterTool
configurations) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
configurations.get(ConfigKeys.KAFKA_HOSTS));
        properties.put("group.id",
"flink-consumer-"+UUID.randomUUID().toString());
        properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("security.protocol",
configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL));
        properties.put("ssl.truststore.location",
configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION));
        properties.put("ssl.truststore.password",
configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD));
        properties.put("ssl.keystore.location",
configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION));
        properties.put("ssl.keystore.password",
configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD));
        return properties;
    }
/


Moreover, when I stop the job, the task manager does not terminate the kafka
connection and the memory is kept allocated. To stop this, I have to kill
the task manager process. 

*My Flink version: 1.2.1
Kafka consumer: 010
Kafka version: 2_11_0.10.1.0-2*

I've activated the /taskmanager.debug.memory.startLogThread/ property to
output for every 5 seconds and attached the log with the results.

The output of free -m before submitting the job:
/              total        used        free      shared  buff/cache  
available
Mem:          15817         245       14755          24         816      
15121
Swap:             0           0           0/

after having the job running for about 5 min:
 free -m
/              total        used        free      shared  buff/cache  
available
Mem:          15817        9819        5150          24         847       
5547
Swap:             0           0           0
/

taskmanager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14342/taskmanager.log>  





-----
Best Regards,
Pedro Chaves
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkKafkaConsumer010 - Memory Issue

Posted by Aljoscha Krettek <al...@apache.org>.
Thanks for letting us know!

> On 18. Sep 2017, at 11:36, PedroMrChaves <pe...@gmail.com> wrote:
> 
> Hello,
> 
> Sorry for the delay.
> 
> The buffer memory of the Kafka consumer was piling up. Once I updated to the
> 1.3.2 version the problem no longer occurred. 
> 
> Pedro.
> 
> 
> 
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkKafkaConsumer010 - Memory Issue

Posted by PedroMrChaves <pe...@gmail.com>.
Hello,

Sorry for the delay.

The buffer memory of the Kafka consumer was piling up. Once I updated to the
1.3.2 version the problem no longer occurred. 

Pedro.



-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: FlinkKafkaConsumer010 - Memory Issue

Posted by Stephan Ewen <se...@apache.org>.
Hi Pedro!

Can you make a heap dump after a few re-submissions and share a screenshot
or so showing which memory piles up? Is it the buffer memory from the Kafka
Producer or Consumer?

Stephan


On Wed, Jul 26, 2017 at 8:11 AM, Kien Truong <du...@gmail.com>
wrote:

> Hi Pedro,
>
> As long as there's no OutOfMemoryError/long garbage collection pause,
>
> there's nothing to worry about keeping memory allocated.
>
> The memory should be garbage-collected by the JVM when necessary.
>
> Regards,
>
> Kien
>
>
>
> On 7/25/2017 10:53 PM, PedroMrChaves wrote:
>
>> Hello,
>>
>> Thank you for the reply.
>>
>> The problem is not that the task manager uses a lot of memory, the problem
>> is that every time I cancel and re-submit the Job the task manager does
>> not
>> release the previously allocated memory.
>>
>> Regards,
>> Pedro Chaves.
>>
>>
>>
>> -----
>> Best Regards,
>> Pedro Chaves
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer0
>> 10-Memory-Issue-tp14342p14445.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Re: FlinkKafkaConsumer010 - Memory Issue

Posted by Kien Truong <du...@gmail.com>.
Hi Pedro,

As long as there's no OutOfMemoryError/long garbage collection pause,

there's nothing to worry about keeping memory allocated.

The memory should be garbage-collected by the JVM when necessary.

Regards,

Kien


On 7/25/2017 10:53 PM, PedroMrChaves wrote:
> Hello,
>
> Thank you for the reply.
>
> The problem is not that the task manager uses a lot of memory, the problem
> is that every time I cancel and re-submit the Job the task manager does not
> release the previously allocated memory.
>
> Regards,
> Pedro Chaves.
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342p14445.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: FlinkKafkaConsumer010 - Memory Issue

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I couldn’t seem to reproduce this.

Taking another look at your description, one thing I spotted was that your Kafka broker installation versions are 0.10.1.0, while the Kafka consumer uses Kafka clients of version 0.10.0.1 (by default, as shown in your logs).
I’m wondering whether or not that could be the root cause here.

Cheers,
Gordon

On 26 July 2017 at 12:14:02 AM, PedroMrChaves (pedro.mr.chaves@gmail.com) wrote:

My Flink version: 1.2.1 
Kafka consumer: 010 
Kafka version: 2_11_0.10.1.0-2

Re: FlinkKafkaConsumer010 - Memory Issue

Posted by PedroMrChaves <pe...@gmail.com>.
Hello,

Thank you for the reply. 

The problem is not that the task manager uses a lot of memory, the problem
is that every time I cancel and re-submit the Job the task manager does not
release the previously allocated memory.

Regards,
Pedro Chaves.



-----
Best Regards,
Pedro Chaves
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342p14445.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkKafkaConsumer010 - Memory Issue

Posted by Kien Truong <du...@gmail.com>.
Hi,

 From the log, it doesn't seem that the task manager use a lot of memory.

Can you post the output of top.

Regards,

Kien


On 7/20/2017 1:23 AM, PedroMrChaves wrote:
> Hello,
>
> Whenever I submit a job to Flink that retrieves data from Kafka the memory
> consumption continuously increases. I've changed the max heap memory from
> 2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the
> limit.
>
> An example of a simple Job that shows this behavior is depicted bellow.
>
> /          /*
>               * Execution Environment Setup
>               */
>              final StreamExecutionEnvironment environment =
> getGlobalJobConfiguration(configDir, configurations);
>
>              /**
>               * Collect event data from Kafka
>               */
>              DataStreamSource<String> s = environment.addSource(new
> FlinkKafkaConsumer010<String>(
>                      configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC),
>                      new SimpleStringSchema(),
>                      getKafkaConfiguration(configurations)));
>              
>              s.filter(new FilterFunction<String>() {
>                  @Override
>                  public boolean filter(String value) throws Exception {
>                      return false;
>                  }
>              }).print();
>
> private static Properties getKafkaConfiguration(ParameterTool
> configurations) {
>          Properties properties = new Properties();
>          properties.setProperty("bootstrap.servers",
> configurations.get(ConfigKeys.KAFKA_HOSTS));
>          properties.put("group.id",
> "flink-consumer-"+UUID.randomUUID().toString());
>          properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>          properties.put("security.protocol",
> configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL));
>          properties.put("ssl.truststore.location",
> configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION));
>          properties.put("ssl.truststore.password",
> configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD));
>          properties.put("ssl.keystore.location",
> configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION));
>          properties.put("ssl.keystore.password",
> configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD));
>          return properties;
>      }
> /
>
>
> Moreover, when I stop the job, the task manager does not terminate the kafka
> connection and the memory is kept allocated. To stop this, I have to kill
> the task manager process.
>
> *My Flink version: 1.2.1
> Kafka consumer: 010
> Kafka version: 2_11_0.10.1.0-2*
>
> I've activated the /taskmanager.debug.memory.startLogThread/ property to
> output for every 5 seconds and attached the log with the results.
>
> The output of free -m before submitting the job:
> /              total        used        free      shared  buff/cache
> available
> Mem:          15817         245       14755          24         816
> 15121
> Swap:             0           0           0/
>
> after having the job running for about 5 min:
>   free -m
> /              total        used        free      shared  buff/cache
> available
> Mem:          15817        9819        5150          24         847
> 5547
> Swap:             0           0           0
> /
>
> taskmanager.log
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14342/taskmanager.log>
>
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: FlinkKafkaConsumer010 - Memory Issue

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Pedro,

Seems like a memory leak. The only issue I’m currently aware of that may be related is [1]. Could you tell if this JIRA relates to what you are bumping into?
The JIRA mentions Kafka 09, but a fix is only available for Kafka 010 once we bump our Kafka 010 dependency to the latest version.

If that doesn’t relate to you, please let me know and I'll investigate a bit more.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-6301


On 20 July 2017 at 3:14:57 AM, Fabian Hueske (fhueske@gmail.com) wrote:

Hi,

Gordon (in CC) knows the details of Flink's Kafka consumer.
He might know how to solve this issue.

Best, Fabian

2017-07-19 20:23 GMT+02:00 PedroMrChaves <pe...@gmail.com>:
Hello,

Whenever I submit a job to Flink that retrieves data from Kafka the memory
consumption continuously increases. I've changed the max heap memory from
2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the
limit.

An example of a simple Job that shows this behavior is depicted bellow.

/          /*
             * Execution Environment Setup
             */
            final StreamExecutionEnvironment environment =
getGlobalJobConfiguration(configDir, configurations);

            /**
             * Collect event data from Kafka
             */
            DataStreamSource<String> s = environment.addSource(new
FlinkKafkaConsumer010<String>(
                    configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC),
                    new SimpleStringSchema(),
                    getKafkaConfiguration(configurations)));

            s.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    return false;
                }
            }).print();

private static Properties getKafkaConfiguration(ParameterTool
configurations) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
configurations.get(ConfigKeys.KAFKA_HOSTS));
        properties.put("group.id",
"flink-consumer-"+UUID.randomUUID().toString());
        properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("security.protocol",
configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL));
        properties.put("ssl.truststore.location",
configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION));
        properties.put("ssl.truststore.password",
configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD));
        properties.put("ssl.keystore.location",
configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION));
        properties.put("ssl.keystore.password",
configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD));
        return properties;
    }
/


Moreover, when I stop the job, the task manager does not terminate the kafka
connection and the memory is kept allocated. To stop this, I have to kill
the task manager process.

*My Flink version: 1.2.1
Kafka consumer: 010
Kafka version: 2_11_0.10.1.0-2*

I've activated the /taskmanager.debug.memory.startLogThread/ property to
output for every 5 seconds and attached the log with the results.

The output of free -m before submitting the job:
/              total        used        free      shared  buff/cache
available
Mem:          15817         245       14755          24         816
15121
Swap:             0           0           0/

after having the job running for about 5 min:
 free -m
/              total        used        free      shared  buff/cache
available
Mem:          15817        9819        5150          24         847
5547
Swap:             0           0           0
/

taskmanager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14342/taskmanager.log>





-----
Best Regards,
Pedro Chaves
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: FlinkKafkaConsumer010 - Memory Issue

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

Gordon (in CC) knows the details of Flink's Kafka consumer.
He might know how to solve this issue.

Best, Fabian

2017-07-19 20:23 GMT+02:00 PedroMrChaves <pe...@gmail.com>:

> Hello,
>
> Whenever I submit a job to Flink that retrieves data from Kafka the memory
> consumption continuously increases. I've changed the max heap memory from
> 2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the
> limit.
>
> An example of a simple Job that shows this behavior is depicted bellow.
>
> /          /*
>              * Execution Environment Setup
>              */
>             final StreamExecutionEnvironment environment =
> getGlobalJobConfiguration(configDir, configurations);
>
>             /**
>              * Collect event data from Kafka
>              */
>             DataStreamSource<String> s = environment.addSource(new
> FlinkKafkaConsumer010<String>(
>                     configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC),
>                     new SimpleStringSchema(),
>                     getKafkaConfiguration(configurations)));
>
>             s.filter(new FilterFunction<String>() {
>                 @Override
>                 public boolean filter(String value) throws Exception {
>                     return false;
>                 }
>             }).print();
>
> private static Properties getKafkaConfiguration(ParameterTool
> configurations) {
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> configurations.get(ConfigKeys.KAFKA_HOSTS));
>         properties.put("group.id",
> "flink-consumer-"+UUID.randomUUID().toString());
>         properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         properties.put("security.protocol",
> configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL));
>         properties.put("ssl.truststore.location",
> configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION));
>         properties.put("ssl.truststore.password",
> configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD));
>         properties.put("ssl.keystore.location",
> configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION));
>         properties.put("ssl.keystore.password",
> configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD));
>         return properties;
>     }
> /
>
>
> Moreover, when I stop the job, the task manager does not terminate the
> kafka
> connection and the memory is kept allocated. To stop this, I have to kill
> the task manager process.
>
> *My Flink version: 1.2.1
> Kafka consumer: 010
> Kafka version: 2_11_0.10.1.0-2*
>
> I've activated the /taskmanager.debug.memory.startLogThread/ property to
> output for every 5 seconds and attached the log with the results.
>
> The output of free -m before submitting the job:
> /              total        used        free      shared  buff/cache
> available
> Mem:          15817         245       14755          24         816
> 15121
> Swap:             0           0           0/
>
> after having the job running for about 5 min:
>  free -m
> /              total        used        free      shared  buff/cache
> available
> Mem:          15817        9819        5150          24         847
> 5547
> Swap:             0           0           0
> /
>
> taskmanager.log
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n14342/taskmanager.log>
>
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-
> Issue-tp14342.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>