You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Ken Giusti (Assigned) (JIRA)" <ji...@apache.org> on 2012/03/20 18:43:43 UTC

[jira] [Assigned] (QPID-3896) Broker crash when using auto delete queues in a cluster

     [ https://issues.apache.org/jira/browse/QPID-3896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ken Giusti reassigned QPID-3896:
--------------------------------

    Assignee: Ken Giusti  (was: Alan Conway)
    
> Broker crash when using auto delete queues in a cluster
> -------------------------------------------------------
>
>                 Key: QPID-3896
>                 URL: https://issues.apache.org/jira/browse/QPID-3896
>             Project: Qpid
>          Issue Type: Bug
>          Components: C++ Clustering
>    Affects Versions: 0.17
>            Reporter: Ken Giusti
>            Assignee: Ken Giusti
>             Fix For: 0.17
>
>         Attachments: qpid-3896.patch
>
>
> I can get the broker to crash with a simple configuration involving multiple 'auto delete' queues.
> The following client pseudo code can cause the crash:
>     loc_sess = [ ];
>     # create couple of sessions
>     for i in range(in_loops):
>       queue_durability = False;
>       if (i % 2 == 1):
>         queue_durability = True;
>       # create new local session[s]
>       lsess = self.connection.session(loc_sess_name % i);
>       # delete the queue (if needed)
>       self.cleanup(in_queue=loc_q_name % i);
>       # declare auto-delete queue[s]
>       lsess.queue_declare(queue=loc_q_name % i, 
>                           auto_delete=True,
>                           arguments={"qpid.auto_delete_timeout" : q_timeout},
>                           durable=queue_durability);
>       
>       # check that queue[s] is still available
>       result = lsess.queue_query(queue=loc_q_name % i);
>       self.assertEqual(loc_q_name % i, result.queue);
>     
>       # bind queue to exchange amf.fanout
>       lsess.exchange_bind(exchange=e_name,
>                           queue=loc_q_name % i,
>                           binding_key=f_name);
>       # append the session to list
>       loc_sess.append(lsess);
>     
>     # send messages to the queues via amq.fanout
>     dp = sess.delivery_properties(routing_key=f_name);
>     msg_cnt = random.randint(*MSG_CNT_RR);
>     print "setup: in_loops:%d, msg_cnt:%d" % (in_loops, msg_cnt);
>     for j in range(msg_cnt):
>       sess.message_transfer(destination=e_name,
>                             message=qpid.datatypes.Message(dp, msg_layout % j));
>     
>     # check that queues contain correct message count via QMF
>     self.startQmf();
>     for i in range(in_loops):
>       sq = self.qmf_session.getObjects(_class="queue", name=loc_q_name % i)[0];
>       self.assertEqual (sq.msgDepth, msg_cnt);
>     # receive one (first) message from the queues
>     for i in range(in_loops):
>       loc_sess[i].message_subscribe(destination="dlq", queue=loc_q_name % i);
>       loc_sess[i].message_flow(destination="dlq", value=0xFFFFFFFFL,
>                                unit=loc_sess[i].credit_unit.message)
>       loc_sess[i].message_flow(destination="dlq", value=0xFFFFFFFFL,
>                                unit=loc_sess[i].credit_unit.byte)
>       dlq = loc_sess[i].incoming("dlq");
>       msg=dlq.get(timeout=1);
>       self.assertEqual(msg_layout % 0, msg.body);
>       
>     # check that queues are present at this point (subscription still alive atm)
>     for i in range(in_loops):
>       # browse sessions
>       result = loc_sess[i].queue_query(queue=loc_q_name % i);
>       self.assertEqual(loc_q_name % i, result.queue);
>     
>       loc_sess[i].close();
>     # check that queues are still available (after local sessions JUST closed)
>     for i in range(in_loops):
>       # browse sessions
>       result = sess.queue_query(queue=loc_q_name % i);
>       self.assertEqual(loc_q_name % i, result.queue);
>     print "sleeping - waiting for queue auto timeout"
>     time.sleep(q_timeout+AD_TIMEOUT_TOL);
>     
>     # check whether queue has been deleted (expected to be deleted)
>     for i in range(in_loops):
>       result = sess.queue_query(queue=loc_q_name % i);
>       self.assert_(not result.queue);
> Analysis:
> The ClusterTimer is unable to handle storing two timer tasks that have the same name.  The Queue code creates a timer task for each auto delete queue.  These tasks all have the same name "DelayedAutoDeletion".  This causes ClusterTimer::add() to throw an exception as it thinks there are duplicate timer tasks.
>    

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org