You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "lx (Jira)" <ji...@apache.org> on 2020/11/10 07:15:00 UTC

[jira] [Updated] (AMQCPP-654) memory leak

     [ https://issues.apache.org/jira/browse/AMQCPP-654?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

lx updated AMQCPP-654:
----------------------
    Description: 
I use cms to receive and send messages to artemis, and the consumer has a memory leak. My code like this
{code:c++}
void MyConsumer::WorkThread() {
    try {
            {
                std::lock_guard<std::mutex> lock(mtx_);
                if (inited_) {
                    LOG<<"inted\n";
                    return;
                }
                if (SetMqClientId() != 0) {
                    throw std::runtime_error("init mq client id failed");
                }
                std::shared_ptr<cms::ConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(conf_.broker_uri));
                if (!connectionFactory) {
                    throw std::runtime_error("connectionFactory is nullptr");
                }
                connection = connectionFactory->createConnection(conf_.user_name, conf_.password, mq_client_id_);
                if (!connection) {
                    throw std::runtime_error("connection is nullptr");
                }

                if (conf_.auto_ack) {
                    session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
                } else {
                    session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
                }
                if (!session) {
                    throw std::runtime_error("session is nullptr");
                }

                if (conf_.use_topic) {
                    destination = session->createTopic(conf_.dest_uri);
                } else {
                    throw std::invalid_argument("error");
                }
                if (!destination) {
                    throw std::runtime_error("destination is nullptr");
                }

                consumer = session->createConsumer(destination);
                if (!consumer) {
                    throw std::runtime_error("consumer is nullptr");
                }
                producer_ = new MyProducer(response_uri_, connection, conf_);
                producer_->Init();

                connection->start();
                inited_ = true;
            }
            
            while (thread_running_) {
                Message* mq_message = consumer->receive(check_message_interval_);
                if (mq_message && thread_running_) {
                    onMessage(mq_message);
                }
                if (mq_message) {
                    delete mq_message;
                    mq_message = nullptr;
                }
            }
    }
    catch (const decaf::lang::Exception& e) {
        LOG<<"error:"<<e.what()<<"\n";
    }
    catch (const cms::CMSException& e) {
        LOG<<"error:"<<e.what()<<"\n";
    }
    catch (const std::exception& e) {
        LOG<<"error:"<<e.what()<<"\n";
    }
    LOG<<"stopped\n";
    if (thread_running_) {
        sleep(restart_interval_);
        std::thread tmp(&MyConsumer::Start, this);
        tmp.detach();
    }
}
{code}
The memory leak occurred even if I remove the consumer->receive method(the while loop does nothing)

  was:
I use cms to receive and send messages to artemis, and the consumer has a memory leak. My code like this
{code:c++}
void MyConsumer::WorkThread() {
    try {
            {
                std::lock_guard<std::mutex> lock(mtx_);
                if (inited_) {
                    LOG<<"inted\n";
                    return;
                }
                if (SetMqClientId() != 0) {
                    throw std::runtime_error("init mq client id failed");
                }
                std::shared_ptr<cms::ConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(conf_.broker_uri));
                if (!connectionFactory) {
                    throw std::runtime_error("connectionFactory is nullptr");
                }
                connection = connectionFactory->createConnection(conf_.user_name, conf_.password, mq_client_id_);
                if (!connection) {
                    throw std::runtime_error("connection is nullptr");
                }

                if (conf_.auto_ack) {
                    session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
                } else {
                    session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
                }
                if (!session) {
                    throw std::runtime_error("session is nullptr");
                }

                if (conf_.use_topic) {
                    destination = session->createTopic(conf_.dest_uri);
                } else {
                    throw std::invalid_argument("error");
                }
                if (!destination) {
                    throw std::runtime_error("destination is nullptr");
                }

                consumer = session->createConsumer(destination);
                if (!consumer) {
                    throw std::runtime_error("consumer is nullptr");
                }
                producer_ = new MyProducer(response_uri_, connection, conf_);
                producer_->Init();

                connection->start();
                inited_ = true;
            }
            
            while (thread_running_) {
                Message* mq_message = consumer->receive(check_message_interval_);
                if (mq_message && thread_running_) {
                    onMessage(mq_message);
                }
                if (mq_message) {
                    delete mq_message;
                    mq_message = nullptr;
                }
            }
    }
    catch (const decaf::lang::Exception& e) {
        LOG<<"error:"<<e.what()<<"\n";
    }
    catch (const cms::CMSException& e) {
        LOG<<"error:"<<e.what()<<"\n";
    }
    catch (const std::exception& e) {
        LOG<<"error:"<<e.what()<<"\n";
    }
    LOG<<"stopped\n";
    if (thread_running_) {
        sleep(restart_interval_);
        std::thread tmp(&MyConsumer::Start, this);
        tmp.detach();
    }
}
{code}

The memory leak occurred even if I remove the onMessage method(the while loop does nothing)


> memory leak
> -----------
>
>                 Key: AMQCPP-654
>                 URL: https://issues.apache.org/jira/browse/AMQCPP-654
>             Project: ActiveMQ C++ Client
>          Issue Type: Bug
>          Components: CMS Impl
>    Affects Versions: 3.9.5
>         Environment: centos 6.9 based pod in k8s, cms version is 3.9.5, use std=c++11 in g++
>            Reporter: lx
>            Assignee: Timothy A. Bish
>            Priority: Blocker
>              Labels: artemis, cms, memory-leak
>
> I use cms to receive and send messages to artemis, and the consumer has a memory leak. My code like this
> {code:c++}
> void MyConsumer::WorkThread() {
>     try {
>             {
>                 std::lock_guard<std::mutex> lock(mtx_);
>                 if (inited_) {
>                     LOG<<"inted\n";
>                     return;
>                 }
>                 if (SetMqClientId() != 0) {
>                     throw std::runtime_error("init mq client id failed");
>                 }
>                 std::shared_ptr<cms::ConnectionFactory> connectionFactory(new ActiveMQConnectionFactory(conf_.broker_uri));
>                 if (!connectionFactory) {
>                     throw std::runtime_error("connectionFactory is nullptr");
>                 }
>                 connection = connectionFactory->createConnection(conf_.user_name, conf_.password, mq_client_id_);
>                 if (!connection) {
>                     throw std::runtime_error("connection is nullptr");
>                 }
>                 if (conf_.auto_ack) {
>                     session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
>                 } else {
>                     session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);
>                 }
>                 if (!session) {
>                     throw std::runtime_error("session is nullptr");
>                 }
>                 if (conf_.use_topic) {
>                     destination = session->createTopic(conf_.dest_uri);
>                 } else {
>                     throw std::invalid_argument("error");
>                 }
>                 if (!destination) {
>                     throw std::runtime_error("destination is nullptr");
>                 }
>                 consumer = session->createConsumer(destination);
>                 if (!consumer) {
>                     throw std::runtime_error("consumer is nullptr");
>                 }
>                 producer_ = new MyProducer(response_uri_, connection, conf_);
>                 producer_->Init();
>                 connection->start();
>                 inited_ = true;
>             }
>             
>             while (thread_running_) {
>                 Message* mq_message = consumer->receive(check_message_interval_);
>                 if (mq_message && thread_running_) {
>                     onMessage(mq_message);
>                 }
>                 if (mq_message) {
>                     delete mq_message;
>                     mq_message = nullptr;
>                 }
>             }
>     }
>     catch (const decaf::lang::Exception& e) {
>         LOG<<"error:"<<e.what()<<"\n";
>     }
>     catch (const cms::CMSException& e) {
>         LOG<<"error:"<<e.what()<<"\n";
>     }
>     catch (const std::exception& e) {
>         LOG<<"error:"<<e.what()<<"\n";
>     }
>     LOG<<"stopped\n";
>     if (thread_running_) {
>         sleep(restart_interval_);
>         std::thread tmp(&MyConsumer::Start, this);
>         tmp.detach();
>     }
> }
> {code}
> The memory leak occurred even if I remove the consumer->receive method(the while loop does nothing)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)