You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Seth Zegelstein (Jira)" <ji...@apache.org> on 2019/11/06 22:53:00 UTC

[jira] [Closed] (QPID-8347) Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker

     [ https://issues.apache.org/jira/browse/QPID-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Seth Zegelstein closed QPID-8347.
---------------------------------
    Resolution: Fixed

Rabbitmq fixed this issue with update 3.8.1

> Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker
> ------------------------------------------------------------------------------
>
>                 Key: QPID-8347
>                 URL: https://issues.apache.org/jira/browse/QPID-8347
>             Project: Qpid
>          Issue Type: Bug
>          Components: C++ Client
>    Affects Versions: qpid-cpp-1.39.0
>         Environment: OS: Ubuntu Bionic
> QPID C++ version 1.39.0
> QPID Proton version 0.28.0
> RabbitMQ Broker version 3.6.10
> Erlang version 20.2.2
> RabbimtMQ AMQP 1.0 plugin Version 3.6.10
>  
>            Reporter: Seth Zegelstein
>            Priority: Blocker
>
> Hello All,
> I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker throws an error and closes the session. The QPID C++ client then hangs indefinitely in the receive call. I believe that this is a bug in the QPID C++ API because it should always respect the client provided timeout, no mater what the Broker does. I also wrote a bug for RabbitMQ AMQP 1.0 plugin: [https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/90]
>  
> Update: After further testing, it seems like Qpid Proton handles the timeout case correctly with RabbitMQ Broker, isolating this issue to the C++ API.
> helloworld_blocking.py:
> from __future__ import print_function
> from proton import Message
> from proton.utils import BlockingConnection
> from proton.handlers import IncomingMessageHandler
>  
> conn = BlockingConnection("localhost:5672")
> receiver = conn.create_receiver("examples")
> sender = conn.create_sender("examples")
> #sender.send(Message(body="Hello World!"));
> msg = receiver.receive(timeout=0)
> print(msg.body)
> receiver.accept()
> conn.close()
>  
> Result:
>  python helloworld_blocking.py 
> Traceback (most recent call last):
>   File "helloworld_blocking.py", line 30, in <module>
>     msg = receiver.receive(timeout=0)
>   File "/home/user/qpid-proton-0.28.0/build/python/dist/proton/_utils.py", line 171, in receive
>     timeout=timeout)
>   File "/home/user/qpid-proton-0.28.0/build/python/dist/proton/_utils.py", line 314, in wait
>     raise Timeout(txt)
> proton._exceptions.Timeout: Connection amqp://localhost:5672 timed out: Receiving on receiver 5aa78b24-27c6-4b76-a92a-d5410aa0a6ef-examples
>  
> QPID C++ Example Code (Modified HelloWorld example):
> #include <qpid/messaging/Connection.h>
> #include <qpid/messaging/Message.h>
> #include <qpid/messaging/Receiver.h>
> #include <qpid/messaging/Sender.h>
> #include <qpid/messaging/Session.h>
> #include <iostream>
>  
> using namespace qpid::messaging;
>  
> int main(int argc, char** argv) {
>     std::string broker = argc > 1 ? argv[1] : "localhost:5672";
>     std::cout << "broker: " << broker << std::endl;
>     std::string address = argc > 2 ? argv[2] : "topic.hello.world";
>     std::cout << "address: " << address << std::endl;
>     std::string connectionOptions = argc > 3 ? argv[3] : "";
>     std::cout << "connectionOptions: " << connectionOptions << std::endl;
>  
>     try
> {                 Connection connection(broker, connectionOptions);         connection.open();         Session session = connection.createSession();          Receiver receiver = session.createReceiver(address);         Message message;         std::cout << "Pre Receive" << std::endl;         message = receiver.fetch(Duration::SECOND * 10);         std::cout << "Post Receive" << std::endl;         session.acknowledge();           connection.close();         return 0;     }
> catch(const std::exception& error)
> {         std::cerr << error.what() << std::endl;         return 1;     }
> }
> }
>  
> Server Error (Occurs after 10 second timeout):
> =INFO REPORT==== 23-Jul-2019::17:49:54 ===
> accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)
>  
> =ERROR REPORT==== 23-Jul-2019::17:49:54 ===
> closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):
> {bad_version,\{1,1,0,10}}
>  
> =INFO REPORT==== 23-Jul-2019::17:49:54 ===
> accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)
>  
> =ERROR REPORT==== 23-Jul-2019::17:50:04 ===
>  * 
>  ** Generic server <0.675.0> terminating 
>  * 
>  ** Last message in was {send_command,
>                            {'basic.credit_drained',
>                                <<99,116,97,103,45,0,0,0,0>>,
>                                1}}
>  * 
>  ** When Server state == {state,1,<0.669.0>,<0.674.0>,direct,
>                               
> {[],[]},
>  
>                                 false,<0.678.0>,none,none,0,true,none,
>  
>                                 \{0,nil},
>  
>                                 \{0,nil},
>  
>                                 true,false}
>  
>  ** Reason for termination == 
>  
>  ** \{{badmatch,{empty,{[],[]}
> }},
>     [
> {amqp_channel,rpc_bottom_half,2,                    [
> {file,"src/amqp_channel.erl"},\{line,623}]},
>  
>       \{amqp_channel,handle_method_from_server1,3,                    [{file,"src/amqp_channel.erl"}
> ,
> {line,800}
> ]},
>      {gen_server,try_dispatch,4,
> {line,616}
> ]},
>      {gen_server,handle_msg,6,[
> {file,"gen_server.erl"},//\{line,686}" class="external-link" rel="nofollow">\\\\\\\{file,"gen_server.erl"}
> ,\{line,616}]},
>     
> {gen_server,handle_msg,6,[ \\{file,"gen_server.erl"}
>  
>  ,\\\\\\\{line,686}},
>  
>       {proc_lib,init_p_do_apply,3,[\\\\
>  \{file,"proc_lib.erl"},\\\\\\\{line,247}|file://\{file,/]}]}
>  
>   
>  
>  =WARNING REPORT==== 23-Jul-2019::17:50:04 ===
>  
>  Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch,
>  
>                                                                           \{empty,                                                                           {[],                                                                            []}}},
>  
>                                                                          [\{amqp_channel,                                                                           rpc_bottom_half,                                                                           2,                                                                           [{file,                                                                             "src/amqp_channel.erl"},
>  
>                                                                             \{line,                                                                             623}]},
>  
>                                                                           \{amqp_channel,                                                                           handle_method_from_server1,                                                                           3,                                                                           [{file,                                                                             "src/amqp_channel.erl"},
>  
>                                                                             \{line,                                                                             800}]},
>  
>                                                                           \{gen_server,                                                                           try_dispatch,                                                                           4,                                                                           [{file,                                                                             "gen_server.erl"},
>  
>                                                                             \{line,                                                                             616}]},
>  
>                                                                           \{gen_server,                                                                           handle_msg,                                                                           6,                                                                           [{file,                                                                             "gen_server.erl"},
>  
>                                                                             \{line,                                                                             686}]},
>  
>                                                                           \{proc_lib,                                                                           init_p_do_apply,                                                                           3,                                                                           [{file,                                                                             "proc_lib.erl"},
>  
>                                                                             \{line,                                                                             247}]}]}
>  
>   
>  
>  =ERROR REPORT==== 23-Jul-2019::17:50:04 ===
>  
>  ** Generic server <0.678.0> terminating
>  
>  ** Last message in was {'EXIT',<0.675.0>,
>  
>                             \{{badmatch,{empty,{[],[]}}},}}
>  
>  \{{                             [{amqp_channel,rpc_bottom_half,2,                                  [ {file,"src/amqp_channel.erl"},\\\\\\\\{line,623}]},}}
>  
>  \{{                              {amqp_channel,handle_method_from_server1,3,                                  [{file,"src/amqp_channel.erl"},\{line,800}
>  
>  ]},}}
>  
>  \{{                              {gen_server,try_dispatch,4,                                  [ \{file,"gen_server.erl"}
>  
>  , \{line,616}]},}}
>  
>  \{{                              {gen_server,handle_msg,6,                                  [ \{file,"gen_server.erl"}
>  
>  ,
>  \{line,686}
>  
>  ]},}}
>  
>  \{{                              {proc_lib,init_p_do_apply,3,                                  [ \{file,"proc_lib.erl"}
>  
>  ,
>  \{line,247}
>  
>  ]}]}}}}
>  
>  {{ ** When Server state ==
>  
>  \{ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,                          <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,                          \{lstate,<0.677.0>,false}
>  
>  ,}}
>  
>  \{{                          none,1,}}
>  
>  {{                         
>  \{[],[]},}}
>  
>  \{{                          {user,<<"guest">>,                           [administrator],                           [\\\{rabbit_auth_backend_internal,none}|file://\{rabbit_auth_backend_internal,none}/]},}}
>  
>  \{{                          <<"/">>,<<>>,}}
>  
>  \{{                          {dict,1,16,16,8,80,48,                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                           {{[],[],[],[],[],[],[],[],[],}}
>  
>  \{{                             [[<0.583.0>|}}
>  
>  \{{                               {resource,<<"/">>,queue,                                <<"topic.hello.world">>}]],}}
>  
>  \{{                             [],[],[],[],[],[]}}},}}
>  
>  \{{                          {state,}}
>  
>  \{{                           {dict,1,16,16,8,80,48,                            {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                            {{[],[],[],[],[],[],[],[],[],}}
>  
>  {{                              [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],}}
>  
>  \{{                              [],[],[],[],[],[]}}},}}
>  
>  \{{                           erlang},}}
>  
>  \{{                          {dict,1,16,16,8,80,48,                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                           {{[],[],[],[],[],[],[],[],[],[],[],}}
>  
>  \{{                             [[<<99,116,97,103,45,0,0,0,0>>|}}
>  
>  {{                               {{amqqueue,}}
>  
>  \{{                                 {resource,<<"/">>,queue,                                  <<"topic.hello.world">>},}}
>  
>  \{{                                 false,false,none,[],<0.583.0>,[],[],[],}}
>  
>  \{{                                 undefined,[],[],live,0},}}
>  
>  \{{                                {false,65535,false,                                 [{<<"x-credit">>,table,                                   [{<<"credit">>,long,0},}}
>  
>  \{{                                    {<<"drain">>,boolean,false}]}]}}]],}}
>  
>  \{{                             [],[],[],[]}}},}}
>  
>  \{{                          {dict,1,16,16,8,80,48,                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                           {{[],[],[],[],[],[],[],[],[],}}
>  
>  \{{                             [[<0.583.0>|}}
>  
>  \{{                               {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],}}
>  
>  \{{                             [],[],[],[],[],[]}}},}}
>  
>  \{{                          {set,1,16,16,8,80,48,                           {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                           {{[],[],[],[],[],[],[],[],[],}}
>  
>  {{                             [<0.583.0>],}}
>  
>  \{{                             [],[],[],[],[],[]}}},}}
>  
>  \{{                          <0.672.0>,}}
>  
>  \{{                          {state,fine,5000,                           #Ref<0.4039704202.3895984129.147347>},}}
>  
>  \{{                          false,1,}}
>  
>  \{{                          }}{{0,nil},{0,nil}}{{,}}
>  
>  \{{                          [],}}
>  
>  {{                          {{0,nil},{0,nil}},}}
>  
>  \{{                          [{<<"publisher_confirms">>,bool,true},}}
>  
>  \{{                           {<<"exchange_exchange_bindings">>,bool,true},}}
>  
>  \{{                           {<<"basic.nack">>,bool,true},}}
>  
>  \{{                           {<<"consumer_cancel_notify">>,bool,true},}}
>  
>  \{{                           {<<"connection.blocked">>,bool,true},}}
>  
>  \{{                           {<<"authentication_failure_close">>,bool,true}],}}
>  
>  \{{                          none,65535,none,flow,[]}}}
>  
>  \{{ ** Reason for termination == }}
>  
>  \{{ ** {{badmatch,{empty,{[],[]}
>  
>  }},
>  
>      [\{amqp_channel,rpc_bottom_half,2,                    [ {file,"src/amqp_channel.erl"}
>  
>  ,\{line,623}]},
>  
>      
>  
>  \{amqp_channel,handle_method_from_server1,3,                    [ {file,"src/amqp_channel.erl"}
>  
>  ,\{line,800}]},
>  
>      
>  
>  \{gen_server,try_dispatch,4,[ {file,"gen_server.erl"}
>  
>  ,\{line,616}]},
>  
>      
>  
>  \{gen_server,handle_msg,6,[ {file,"gen_server.erl"}
> ,\{line,686}]},
>     
> {proc_lib,init_p_do_apply,3,[ \\{file,"proc_lib.erl"}
> ,\{line,247}]}]}
>  
>  
> gdb backtrace:
> (gdb) r
> Starting program: /mnt/user/dev/rabbit_mq_test/hello_world 
> [Thread debugging using libthread_db enabled]
> Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
> broker: localhost:5672
> address: topic.hello.world
> connectionOptions: 
> [New Thread 0x7ffff3d53700 (LWP 26650)]
> [New Thread 0x7ffff3340700 (LWP 26651)]
> [New Thread 0x7ffff2b3f700 (LWP 26652)]
> Pre Receive
> ^C
> Thread 1 "hello_world" received signal SIGINT, Interrupt.
> 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
> 88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory.
> (gdb) bt
> #0  0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88
> #1  __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, cond=0x555555785cd8) at pthread_cond_wait.c:502
> #2  __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at pthread_cond_wait.c:655
> #3  0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, mutex=...) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59
> #4  0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41
> #5  0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706
> #6  0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0, ssn=..., lnk=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721
> #7  0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...)
>     at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271
> #8  0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, message=..., timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55
> #9  0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61
> #10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch (this=0x7fffffffe780, timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52
> #11 0x000055555555589c in main ()
> (gdb) 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org