You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Anthony Foglia <af...@princeton.com> on 2011/08/11 16:18:09 UTC

Queues that exist only when their are receivers

We're trying to create a topology where we have an exchange with a
queue bound to it, such that while there are receivers attached to the
queue, the queue exists, but when there are no receivers the queue
goes away.  (The exchange has an alternate exchange which sends the
message across a federated queue to another broker.)

We are creating the receiver with an address "binding_bug_queue; {
create: always, node: { type: queue, durable: False, x-declare: {
exclusive: False, auto-delete: True } }, link: { durable: False,
x-bindings: [ { exchange : 'binding_bug_exchange', key: 'init' } ] }
}".

With the C++ library (trunk version), if there are two receivers in
different processes, then when either closes, the binding is removed.

But with the Python library (RHEL package 0.7.946106-15.el6), when
either of two processes close, the binding stays.

Also, since we are using the SWIG bindings, I tried those, and
discovered if I do not properly close the receiver, the session, and
the connection objects, the binding survives the ending of one of the
receiving processes, but not both.  (i.e. It works as desired.)

Which library is doing the correct thing?  Is our address correct?

Below I have pasted my two test programs.


-- 
Anthony Foglia
Princeton Consultants
(609) 987-8787 x233



----- binding_test.cpp -----
#include <iostream>
#include <string>
#include <cstdlib>

#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Receiver.h>


void print_first_binding(const std::string & exchange) {
  std::string cmd = "qpid-config -b exchanges | grep -A 1 "+exchange;
  std::system(cmd.c_str());
}


int main() {
  using std::string;
  using std::cout;
  using std::endl;
  using qpid::messaging::Connection;
  using qpid::messaging::Session;
  using qpid::messaging::Receiver;

  string queue = "binding_bug_queue";
  string exchange = "binding_bug_exchange";
  string key = "init";

  print_first_binding(exchange);

  cout << "Opening connection" << endl;
  Connection connection("anthony/princeton@localhost");
  connection.open();

  Session session = connection.createSession();

  string address = queue +"; { create: always, node: { type: queue,
durable: False, x-declare: { exclusive: False, auto-delete: True } },
link: { durable: False, x-bindings: [ { exchange : '"+exchange+"',
key: '"+key+"' } ] } }";

  print_first_binding(exchange);

  cout << "Creating receiver to address: " << address << endl;
  Receiver receiver = session.createReceiver(address);

  print_first_binding(exchange);

  cout << "Press any key to quit: ";
  char c;
  std::cin >> c;

  cout << "Getting bindings one last time" << endl;
  print_first_binding(exchange);

  cout << "Closing" << endl;

  receiver.close();
  session.close();
  connection.close();

  print_first_binding(exchange);
}

----- binding_test.py -----
#!/usr/bin/env python

import os
import time
import optparse
import subprocess

if os.environ.get("QPID_API", "swig").lower() == "swig" :
  try :
    import cqpid as messaging
  except ImportError :
    print "Error using swig.  Falling back to python api"
    import qpid.messaging as messaging
  else :
    print "Using swig api"
else :
  print "Using python API"
  import qpid.messaging as messaging



def print_bindings(exchange_name) :
  proc = subprocess.Popen(["qpid-config", "-b", "exchanges"],
                          stdout=subprocess.PIPE)
  proc.wait()
  on_exchange = False
  for lyne in proc.stdout :
    if lyne.startswith("Exchange ") :
      on_exchange = ("'%s'" % exchange_name) in lyne
    if on_exchange :
      print lyne.rstrip()


if __name__=="__main__" :
  parser = optparse.OptionParser("%prog [OPTIONS]")
  parser.add_option("-b", "--broker")
  parser.add_option("--close", type="choice", choices=("clean", "dirty"),
                    default="clean",
                    help="Clean closing of qpid objects [default: %default]")
  options, args = parser.parse_args()

  queue_name = "binding_bug_queue"
  exchange_name = "binding_bug_exchange"
  key = "init"

  print_bindings(exchange_name)

  print "Connecting"
  connection = messaging.Connection.establish(options.broker)

  session = connection.session()

  address = "{queue}; {{ create: always, node: {{ type: queue,
durable: False, x-declare: {{ exclusive: False, auto-delete: True }}
}}, link: {{ durable: False, x-bindings: [ {{ exchange: '{exchange}',
key: '{key}' }} ] }} }}".format(queue=queue_name,
exchange=exchange_name, key=key)

  print "Creating receiver to address", address
  receiver = session.receiver(address)

  print_bindings(exchange_name)

  try :
    while True :
      time.sleep(1)
  except KeyboardInterrupt :
    print "Exiting"

  if options.close == "clean" :
    print "Closing receiver"
    receiver.close()
    print_bindings(exchange_name)

    print "Closing session"
    session.close()
    print_bindings(exchange_name)

    print "Closing connection"
    connection.close()
    print_bindings(exchange_name)

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Queues that exist only when their are receivers

Posted by Anthony Foglia <af...@princeton.com>.
On Thu, Aug 11, 2011 at 10:36 AM, Gordon Sim <gs...@redhat.com> wrote:
> On 08/11/2011 03:18 PM, Anthony Foglia wrote:
>>
>> We're trying to create a topology where we have an exchange with a
>> queue bound to it, such that while there are receivers attached to the
>> queue, the queue exists, but when there are no receivers the queue
>> goes away.  (The exchange has an alternate exchange which sends the
>> message across a federated queue to another broker.)
>>
>> We are creating the receiver with an address "binding_bug_queue; {
>> create: always, node: { type: queue, durable: False, x-declare: {
>> exclusive: False, auto-delete: True } }, link: { durable: False,
>> x-bindings: [ { exchange : 'binding_bug_exchange', key: 'init' } ] }
>> }".
>
> Move the x-bindings into the node. The bindings for the node are created and
> deleted if and when the node itself is created or deleted. The bindings for
> the link should be scoped to the life of the sender/receiver in question
> (i.e. the logical link to/from the node).
>
> The python client will be fixed to correctly remove the bindings when the
> link is closed.


Thanks for the quick response.  That does the trick.

-- 
Anthony Foglia
Princeton Consultants
(609) 987-8787 x233

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org


Re: Queues that exist only when their are receivers

Posted by Gordon Sim <gs...@redhat.com>.
On 08/11/2011 03:18 PM, Anthony Foglia wrote:
> We're trying to create a topology where we have an exchange with a
> queue bound to it, such that while there are receivers attached to the
> queue, the queue exists, but when there are no receivers the queue
> goes away.  (The exchange has an alternate exchange which sends the
> message across a federated queue to another broker.)
>
> We are creating the receiver with an address "binding_bug_queue; {
> create: always, node: { type: queue, durable: False, x-declare: {
> exclusive: False, auto-delete: True } }, link: { durable: False,
> x-bindings: [ { exchange : 'binding_bug_exchange', key: 'init' } ] }
> }".

Move the x-bindings into the node. The bindings for the node are created 
and deleted if and when the node itself is created or deleted. The 
bindings for the link should be scoped to the life of the 
sender/receiver in question (i.e. the logical link to/from the node).

The python client will be fixed to correctly remove the bindings when 
the link is closed.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscribe@qpid.apache.org