You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Seth Zegelstein (JIRA)" <ji...@apache.org> on 2019/07/23 17:52:00 UTC

[jira] [Created] (QPID-8347) Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker

Seth Zegelstein created QPID-8347:
-------------------------------------

             Summary: Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker
                 Key: QPID-8347
                 URL: https://issues.apache.org/jira/browse/QPID-8347
             Project: Qpid
          Issue Type: Bug
          Components: C++ Client
    Affects Versions: qpid-cpp-1.39.0
         Environment: OS: Ubuntu Bionic

QPID C++ version 1.39.0

QPID Proton version 0.28.0
RabbitMQ Broker version 3.6.10

Erlang version 20.2.2

RabbimtMQ AMQP 1.0 plugin Version 3.6.10

 
            Reporter: Seth Zegelstein


Hello All,

I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker throws an error and closes the session. The QPID C++ client then hangs indefinitely in the receive call.


Example Code (Modified HelloWorld example):

#include <qpid/messaging/Connection.h>

#include <qpid/messaging/Message.h>

#include <qpid/messaging/Receiver.h>

#include <qpid/messaging/Sender.h>

#include <qpid/messaging/Session.h>

#include <iostream>

 

using namespace qpid::messaging;

 

int main(int argc, char** argv) {

    std::string broker = argc > 1 ? argv[1] : "localhost:5672";

    std::cout << "broker: " << broker << std::endl;

    std::string address = argc > 2 ? argv[2] : "topic.hello.world";

    std::cout << "address: " << address << std::endl;

    std::string connectionOptions = argc > 3 ? argv[3] : "";

    std::cout << "connectionOptions: " << connectionOptions << std::endl;

 

    try {

        Connection connection(broker, connectionOptions);

        connection.open();

        Session session = connection.createSession();

 

        Receiver receiver = session.createReceiver(address);

        Message message;

        std::cout << "Pre Receive" << std::endl;

        message = receiver.fetch(Duration::SECOND * 10);

        std::cout << "Post Receive" << std::endl;

        session.acknowledge();

 

        connection.close();

        return 0;

    } catch(const std::exception& error) {

        std::cerr << error.what() << std::endl;

        return 1;

    }

}

 

Server Error (Occurs after 10 second timeout):

=INFO REPORT==== 23-Jul-2019::17:49:54 ===

accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)

 

=ERROR REPORT==== 23-Jul-2019::17:49:54 ===

closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):

{bad_version,\{1,1,0,10}}

 

=INFO REPORT==== 23-Jul-2019::17:49:54 ===

accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)

 

=ERROR REPORT==== 23-Jul-2019::17:50:04 ===

** Generic server <0.675.0> terminating 

** Last message in was {send_command,

                           {'basic.credit_drained',

                               <<99,116,97,103,45,0,0,0,0>>,

                               1}}

** When Server state == {state,1,<0.669.0>,<0.674.0>,direct,

                               {[],[]},

                               false,<0.678.0>,none,none,0,true,none,

                               {0,nil},

                               {0,nil},

                               true,false}

** Reason for termination == 

** \{{badmatch,{empty,{[],[]}}},

    [{amqp_channel,rpc_bottom_half,2,

                   [\{file,"src/amqp_channel.erl"},\{line,623}]},

     {amqp_channel,handle_method_from_server1,3,

                   [\{file,"src/amqp_channel.erl"},\{line,800}]},

     {gen_server,try_dispatch,4,[\{file,"gen_server.erl"},\{line,616}]},

     {gen_server,handle_msg,6,[\{file,"gen_server.erl"},\{line,686}]},

     {proc_lib,init_p_do_apply,3,[\{file,"proc_lib.erl"},\{line,247}]}]}

 

=WARNING REPORT==== 23-Jul-2019::17:50:04 ===

Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch,

                                                                         {empty,

                                                                          {[],

                                                                           []}}},

                                                                        [{amqp_channel,

                                                                          rpc_bottom_half,

                                                                          2,

                                                                          [{file,

                                                                            "src/amqp_channel.erl"},

                                                                           {line,

                                                                            623}]},

                                                                         {amqp_channel,

                                                                          handle_method_from_server1,

                                                                          3,

                                                                          [{file,

                                                                            "src/amqp_channel.erl"},

                                                                           {line,

                                                                            800}]},

                                                                         {gen_server,

                                                                          try_dispatch,

                                                                          4,

                                                                          [{file,

                                                                            "gen_server.erl"},

                                                                           {line,

                                                                            616}]},

                                                                         {gen_server,

                                                                          handle_msg,

                                                                          6,

                                                                          [{file,

                                                                            "gen_server.erl"},

                                                                           {line,

                                                                            686}]},

                                                                         {proc_lib,

                                                                          init_p_do_apply,

                                                                          3,

                                                                          [{file,

                                                                            "proc_lib.erl"},

                                                                           {line,

                                                                            247}]}]}

 

=ERROR REPORT==== 23-Jul-2019::17:50:04 ===

** Generic server <0.678.0> terminating

** Last message in was {'EXIT',<0.675.0>,

                           {\{badmatch,{empty,{[],[]}}},

                            [{amqp_channel,rpc_bottom_half,2,

                                 [\{file,"src/amqp_channel.erl"},\{line,623}]},

                             {amqp_channel,handle_method_from_server1,3,

                                 [\{file,"src/amqp_channel.erl"},\{line,800}]},

                             {gen_server,try_dispatch,4,

                                 [\{file,"gen_server.erl"},\{line,616}]},

                             {gen_server,handle_msg,6,

                                 [\{file,"gen_server.erl"},\{line,686}]},

                             {proc_lib,init_p_do_apply,3,

                                 [\{file,"proc_lib.erl"},\{line,247}]}]}}

** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,

                         <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,

                         {lstate,<0.677.0>,false},

                         none,1,

                         {[],[]},

                         {user,<<"guest">>,

                          [administrator],

                          [\{rabbit_auth_backend_internal,none}]},

                         <<"/">>,<<>>,

                         {dict,1,16,16,8,80,48,

                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},

                          {{[],[],[],[],[],[],[],[],[],

                            [[<0.583.0>|

                              {resource,<<"/">>,queue,

                               <<"topic.hello.world">>}]],

                            [],[],[],[],[],[]}}},

                         {state,

                          {dict,1,16,16,8,80,48,

                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},

                           {{[],[],[],[],[],[],[],[],[],

                             [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],

                             [],[],[],[],[],[]}}},

                          erlang},

                         {dict,1,16,16,8,80,48,

                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},

                          {{[],[],[],[],[],[],[],[],[],[],[],

                            [[<<99,116,97,103,45,0,0,0,0>>|

                              {{amqqueue,

                                {resource,<<"/">>,queue,

                                 <<"topic.hello.world">>},

                                false,false,none,[],<0.583.0>,[],[],[],

                                undefined,[],[],live,0},

                               {false,65535,false,

                                [{<<"x-credit">>,table,

                                  [\{<<"credit">>,long,0},

                                   {<<"drain">>,boolean,false}]}]}}]],

                            [],[],[],[]}}},

                         {dict,1,16,16,8,80,48,

                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},

                          {{[],[],[],[],[],[],[],[],[],

                            [[<0.583.0>|

                              {1,\{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],

                            [],[],[],[],[],[]}}},

                         {set,1,16,16,8,80,48,

                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},

                          {{[],[],[],[],[],[],[],[],[],

                            [<0.583.0>],

                            [],[],[],[],[],[]}}},

                         <0.672.0>,

                         {state,fine,5000,

                          #Ref<0.4039704202.3895984129.147347>},

                         false,1,

                         {\{0,nil},\{0,nil}},

                         [],

                         {\{0,nil},\{0,nil}},

                         [\{<<"publisher_confirms">>,bool,true},

                          {<<"exchange_exchange_bindings">>,bool,true},

                          {<<"basic.nack">>,bool,true},

                          {<<"consumer_cancel_notify">>,bool,true},

                          {<<"connection.blocked">>,bool,true},

                          {<<"authentication_failure_close">>,bool,true}],

                         none,65535,none,flow,[]}

** Reason for termination == 

** \{{badmatch,{empty,{[],[]}}},

    [{amqp_channel,rpc_bottom_half,2,

                   [\{file,"src/amqp_channel.erl"},\{line,623}]},

     {amqp_channel,handle_method_from_server1,3,

                   [\{file,"src/amqp_channel.erl"},\{line,800}]},

     {gen_server,try_dispatch,4,[\{file,"gen_server.erl"},\{line,616}]},

     {gen_server,handle_msg,6,[\{file,"gen_server.erl"},\{line,686}]},

     {proc_lib,init_p_do_apply,3,[\{file,"proc_lib.erl"},\{line,247}]}]}

 

 

gdb backtrace:

(gdb) r

Starting program: /mnt/e339307/dev/rabbit_mq_test/hello_world 

[Thread debugging using libthread_db enabled]

Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".

broker: localhost:5672

address: topic.hello.world

connectionOptions: 

[New Thread 0x7ffff3d53700 (LWP 26650)]

[New Thread 0x7ffff3340700 (LWP 26651)]

[New Thread 0x7ffff2b3f700 (LWP 26652)]

Pre Receive

^C

Thread 1 "hello_world" received signal SIGINT, Interrupt.

0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88

88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory.

(gdb) bt

#0  0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88

#1  __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, cond=0x555555785cd8) at pthread_cond_wait.c:502

#2  __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at pthread_cond_wait.c:655

#3  0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, mutex=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59

#4  0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at /home/e339307/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41

#5  0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706

#6  0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0, ssn=..., lnk=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721

#7  0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...)

    at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271

#8  0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, message=..., timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55

#9  0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61

#10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch (this=0x7fffffffe780, timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52

#11 0x000055555555589c in main ()

(gdb) 

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org