You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Jeremy (JIRA)" <ji...@apache.org> on 2018/08/29 09:32:00 UTC

[jira] [Created] (PROTON-1923) Closing the connection yields random behavior for a receiver with reconnect options, due to the specified idle timeout value

Jeremy created PROTON-1923:
------------------------------

             Summary: Closing the connection yields random behavior for a receiver with reconnect options, due to the specified idle timeout value
                 Key: PROTON-1923
                 URL: https://issues.apache.org/jira/browse/PROTON-1923
             Project: Qpid Proton
          Issue Type: Bug
          Components: cpp-binding
    Affects Versions: proton-c-0.22.0
            Reporter: Jeremy


I have a test case, where I launch a mocked broker, that sends messages when a consumer is connected and a session opened. In on_tracker_settle, the connection is closed.

A consumer specifies the reconnect options and the idle timeout. Depending on the value of the idle timeout (and not the reconnect options), the mocked broker will hang on the thread join. For small values around 100ms, we will hang. For values around 200ms and above, the test works. Without the reconnect options, a small idle timeout triggers the idle timeout error.

The code is the following:
{code:java}
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/delivery.hpp>
#include <proton/listener.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/connection_options.hpp>
#include <proton/reconnect_options.hpp>
#include <proton/receiver_options.hpp>
#include <proton/transport.hpp>
#include <proton/error.hpp>
#include <proton/error_condition.hpp>

#include <proton/log.h>

#include <future>
#include <iostream>
#include <string>
#include <chrono>
#include <thread>

class server_sender : public proton::messaging_handler {
private:
std::string url;
bool fail;
std::promise<void> promise;
proton::listener listener;

public:
server_sender(const std::string& s, std::promise<void> p, bool f = false) : url(s), promise(std::move(p)), sent(false), fail(f) {}

void on_container_start(proton::container &c) override {
std::cout << "server_sender on_container_start" << std::endl;

listener = c.listen(url);
promise.set_value();
}

void on_connection_open(proton::connection& c) {
std::cout << "server_sender on_connection_open" << std::endl;
c.open();
}

void on_session_open(proton::session& s) {
std::cout << "server_sender on_session_open" << std::endl;
s.open();
}

void on_sender_open(proton::sender& s) override {
std::cout << "server_sender on_sender_open" << std::endl;
s.open();
}

void on_sendable(proton::sender& s) override {
if (fail) {
throw std::runtime_error("Simulating server failure");
}

if (!sent) {
std::cout << "server_sender on_sendable" << std::endl;
s.send(proton::message("text message"));
sent = true;
}
}

void on_tracker_settle(proton::tracker& t) override {
std::cout << "server_sender on_tracker_settle" << std::endl;

t.sender().close();
t.connection().close();
listener.stop();
}

void on_error(const proton::error_condition& error) override {
std::cout << "server_sender on_error: " << error.what() << std::endl;

listener.stop();
}

private:
bool sent;
};

class receive_with_retry : public proton::messaging_handler {
private:
std::string url;
std::string queueName;

public:
receive_with_retry(const std::string &u, const std::string& q) : url(u), queueName(q) {}

void on_container_start(proton::container &c) override {
std::cout << "receive_with_retry on_container_start" << std::endl;

c.connect(
url,
proton::connection_options()
.idle_timeout(proton::duration(1))
.reconnect(proton::reconnect_options()
.max_attempts(0)
.delay(proton::duration(1))
.delay_multiplier(1)
.max_delay(proton::duration(1))));
}

void on_connection_open(proton::connection& c) override {
std::cout << "receive_with_retry on_connection_open " << std::endl;
c.open_receiver(queueName, proton::receiver_options().auto_accept(true));
}

void on_session_open(proton::session& session) override {
std::cout << "receive_with_retry on_session_open " << std::endl;
}

void on_receiver_open(proton::receiver& receiver) override {
std::cout << "receive_with_retry on_receiver_open " << std::endl;
receiver.open();
}

void on_message(proton::delivery& delivery, proton::message &message) override {
std::cout << "receive_with_retry on_message " << message.body() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));

delivery.receiver().close();
delivery.receiver().connection().close();
std::cout << "-- receive_with_retry on_message " << message.body() << std::endl;
}

void on_transport_error(proton::transport& error) override {
std::cout << "receive_with_retry: on_transport_error: " << error.error().what() << std::endl;

error.connection().close();
}

void on_error(const proton::error_condition& error) override {
std::cout << "receive_with_retry: on_error: " << error.what() << std::endl;
}
};

void receiverWithRetryTest(const std::string& url, const std::string& queue)
{
std::cout << "Server1 start" << std::endl;
std::promise<void> promise1;
auto future1 = promise1.get_future();
server_sender serverSender1(url, std::move(promise1), false);

std::thread serverSender1Thread([&serverSender1]{
try {
proton::container(serverSender1).run();
}
catch (const std::exception& cause) {
std::cout << "serverSender1 threw: " << cause.what() << std::endl;
}
});
future1.wait();
std::cout << "-- Server1 started" << std::endl;

auto receiveTask = std::async(std::launch::async, [=](){
try {
receive_with_retry receiveWithRetry(url, queue);
proton::container(receiveWithRetry).run();
}
catch (const std::exception& cause) {
std::cout << "Receive task threw the following exception: " << cause.what() << std::endl;
}
});
receiveTask.wait();

std::cout << "Server1 join thread" << std::endl;
if (serverSender1Thread.joinable()) {
serverSender1Thread.join();
}

}

int main() {
try {
pn_log_enable(true);

twoSenderServersWithRetry("127.0.0.1:5672", "test_queue");

return 0;
}
catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}

return 1;
}
{code}
For a bigger idle timeout, where the test works, the logged output is the following:
{code:java}
Server1 start
server_sender on_container_start
-[0000000000477510]:(PN_LISTENER_OPEN)-
Server1 started
Server1 join thread
receive_with_retry on_container_start
[0000000000477510]:(PN_LISTENER_ACCEPT)
server_sender on_connection_open
receive_with_retry on_connection_open
server_sender on_session_open
server_sender on_sender_open
server_sender on_sendable
receive_with_retry on_session_open
receive_with_retry on_receiver_open
receive_with_retry on_message text message
-- receive_with_retry on_message text message
server_sender on_tracker_settle
[0000000000477510]:(PN_LISTENER_CLOSE)
[0000000000475650]:(PN_PROACTOR_INACTIVE)[000000000048E8A0]:(PN_PROACTOR_INACTIVE)

[000000000048E8A0]:(PN_PROACTOR_INTERRUPT)
[0000000000475650]:(PN_PROACTOR_INTERRUPT)
Press any key to continue . . .
{code}
For the low idle timeout, the logged output is the following:
{code:java}
Server1 start
server_sender on_container_start
-[0000000000507510]:(PN_LISTENER_OPEN)-
Server1 started
Server1 join thread
receive_with_retry on_container_start
[0000000000507510]:(PN_LISTENER_ACCEPT)
server_sender on_connection_open
receive_with_retry on_connection_open
server_sender on_session_open
server_sender on_sender_open
server_sender on_sendable
receive_with_retry on_session_open
receive_with_retry on_receiver_open
receive_with_retry on_message text message
-- receive_with_retry on_message text message
s[000000000051E8A0]:(PN_PROACTOR_TIMEOUT)e
rver_sender on_tracker_settle
[0000000000507510]:(PN_LISTENER_CLOSE)
[0000000000505650]:(PN_PROACTOR_INACTIVE)
[0000000000505650]:(PN_PROACTOR_INTERRUPT)
[000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
[000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
[000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
[000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
[000000000051E8A0]:(PN_PROACTOR_TIMEOUT)
^CPress any key to continue . . .{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org