You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Seth Zegelstein (JIRA)" <ji...@apache.org> on 2019/07/23 17:52:00 UTC
[jira] [Created] (QPID-8347) Receive() blocks forever once timeout
has been exceeded with a RabbitMQ Broker
Seth Zegelstein created QPID-8347:
-------------------------------------
Summary: Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker
Key: QPID-8347
URL: https://issues.apache.org/jira/browse/QPID-8347
Project: Qpid
Issue Type: Bug
Components: C++ Client
Affects Versions: qpid-cpp-1.39.0
Environment: OS: Ubuntu Bionic
QPID C++ version 1.39.0
QPID Proton version 0.28.0
RabbitMQ Broker version 3.6.10
Erlang version 20.2.2
RabbimtMQ AMQP 1.0 plugin Version 3.6.10
Reporter: Seth Zegelstein
Hello All,
I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker throws an error and closes the session. The QPID C++ client then hangs indefinitely in the receive call.
Example Code (Modified HelloWorld example):
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <iostream>
using namespace qpid::messaging;
int main(int argc, char** argv) {
std::string broker = argc > 1 ? argv[1] : "localhost:5672";
std::cout << "broker: " << broker << std::endl;
std::string address = argc > 2 ? argv[2] : "topic.hello.world";
std::cout << "address: " << address << std::endl;
std::string connectionOptions = argc > 3 ? argv[3] : "";
std::cout << "connectionOptions: " << connectionOptions << std::endl;
try {
Connection connection(broker, connectionOptions);
connection.open();
Session session = connection.createSession();
Receiver receiver = session.createReceiver(address);
Message message;
std::cout << "Pre Receive" << std::endl;
message = receiver.fetch(Duration::SECOND * 10);
std::cout << "Post Receive" << std::endl;
session.acknowledge();
connection.close();
return 0;
} catch(const std::exception& error) {
std::cerr << error.what() << std::endl;
return 1;
}
}
Server Error (Occurs after 10 second timeout):
=INFO REPORT==== 23-Jul-2019::17:49:54 ===
accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)
=ERROR REPORT==== 23-Jul-2019::17:49:54 ===
closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):
{bad_version,\{1,1,0,10}}
=INFO REPORT==== 23-Jul-2019::17:49:54 ===
accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)
=ERROR REPORT==== 23-Jul-2019::17:50:04 ===
** Generic server <0.675.0> terminating
** Last message in was {send_command,
{'basic.credit_drained',
<<99,116,97,103,45,0,0,0,0>>,
1}}
** When Server state == {state,1,<0.669.0>,<0.674.0>,direct,
{[],[]},
false,<0.678.0>,none,none,0,true,none,
{0,nil},
{0,nil},
true,false}
** Reason for termination ==
** \{{badmatch,{empty,{[],[]}}},
[{amqp_channel,rpc_bottom_half,2,
[\{file,"src/amqp_channel.erl"},\{line,623}]},
{amqp_channel,handle_method_from_server1,3,
[\{file,"src/amqp_channel.erl"},\{line,800}]},
{gen_server,try_dispatch,4,[\{file,"gen_server.erl"},\{line,616}]},
{gen_server,handle_msg,6,[\{file,"gen_server.erl"},\{line,686}]},
{proc_lib,init_p_do_apply,3,[\{file,"proc_lib.erl"},\{line,247}]}]}
=WARNING REPORT==== 23-Jul-2019::17:50:04 ===
Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch,
{empty,
{[],
[]}}},
[{amqp_channel,
rpc_bottom_half,
2,
[{file,
"src/amqp_channel.erl"},
{line,
623}]},
{amqp_channel,
handle_method_from_server1,
3,
[{file,
"src/amqp_channel.erl"},
{line,
800}]},
{gen_server,
try_dispatch,
4,
[{file,
"gen_server.erl"},
{line,
616}]},
{gen_server,
handle_msg,
6,
[{file,
"gen_server.erl"},
{line,
686}]},
{proc_lib,
init_p_do_apply,
3,
[{file,
"proc_lib.erl"},
{line,
247}]}]}
=ERROR REPORT==== 23-Jul-2019::17:50:04 ===
** Generic server <0.678.0> terminating
** Last message in was {'EXIT',<0.675.0>,
{\{badmatch,{empty,{[],[]}}},
[{amqp_channel,rpc_bottom_half,2,
[\{file,"src/amqp_channel.erl"},\{line,623}]},
{amqp_channel,handle_method_from_server1,3,
[\{file,"src/amqp_channel.erl"},\{line,800}]},
{gen_server,try_dispatch,4,
[\{file,"gen_server.erl"},\{line,616}]},
{gen_server,handle_msg,6,
[\{file,"gen_server.erl"},\{line,686}]},
{proc_lib,init_p_do_apply,3,
[\{file,"proc_lib.erl"},\{line,247}]}]}}
** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,
<0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,
{lstate,<0.677.0>,false},
none,1,
{[],[]},
{user,<<"guest">>,
[administrator],
[\{rabbit_auth_backend_internal,none}]},
<<"/">>,<<>>,
{dict,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],
[[<0.583.0>|
{resource,<<"/">>,queue,
<<"topic.hello.world">>}]],
[],[],[],[],[],[]}}},
{state,
{dict,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],
[[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],
[],[],[],[],[],[]}}},
erlang},
{dict,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],
[[<<99,116,97,103,45,0,0,0,0>>|
{{amqqueue,
{resource,<<"/">>,queue,
<<"topic.hello.world">>},
false,false,none,[],<0.583.0>,[],[],[],
undefined,[],[],live,0},
{false,65535,false,
[{<<"x-credit">>,table,
[\{<<"credit">>,long,0},
{<<"drain">>,boolean,false}]}]}}]],
[],[],[],[]}}},
{dict,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],
[[<0.583.0>|
{1,\{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],
[],[],[],[],[],[]}}},
{set,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],
[<0.583.0>],
[],[],[],[],[],[]}}},
<0.672.0>,
{state,fine,5000,
#Ref<0.4039704202.3895984129.147347>},
false,1,
{\{0,nil},\{0,nil}},
[],
{\{0,nil},\{0,nil}},
[\{<<"publisher_confirms">>,bool,true},
{<<"exchange_exchange_bindings">>,bool,true},
{<<"basic.nack">>,bool,true},
{<<"consumer_cancel_notify">>,bool,true},
{<<"connection.blocked">>,bool,true},
{<<"authentication_failure_close">>,bool,true}],
none,65535,none,flow,[]}
** Reason for termination ==
** \{{badmatch,{empty,{[],[]}}},
[{amqp_channel,rpc_bottom_half,2,
[\{file,"src/amqp_channel.erl"},\{line,623}]},
{amqp_channel,handle_method_from_server1,3,
[\{file,"src/amqp_channel.erl"},\{line,800}]},
{gen_server,try_dispatch,4,[\{file,"gen_server.erl"},\{line,616}]},
{gen_server,handle_msg,6,[\{file,"gen_server.erl"},\{line,686}]},
{proc_lib,init_p_do_apply,3,[\{file,"proc_lib.erl"},\{line,247}]}]}
gdb backtrace:
(gdb) r
Starting program: /mnt/e339307/dev/rabbit_mq_test/hello_world
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
broker: localhost:5672
address: topic.hello.world
connectionOptions:
[New Thread 0x7ffff3d53700 (LWP 26650)]
[New Thread 0x7ffff3340700 (LWP 26651)]
[New Thread 0x7ffff2b3f700 (LWP 26652)]
Pre Receive
^C
Thread 1 "hello_world" received signal SIGINT, Interrupt.
0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory.
(gdb) bt
#0 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
#1 __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, cond=0x555555785cd8) at pthread_cond_wait.c:502
#2 __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at pthread_cond_wait.c:655
#3 0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, mutex=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59
#4 0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at /home/e339307/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41
#5 0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706
#6 0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0, ssn=..., lnk=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721
#7 0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...)
at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271
#8 0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, message=..., timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55
#9 0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61
#10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch (this=0x7fffffffe780, timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52
#11 0x000055555555589c in main ()
(gdb)
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org