You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by Kerry Bonin <ke...@gmail.com> on 2010/04/21 22:05:10 UTC

Help requested w/ federation - what am I doing wrong?

This is on XP SP2, using recent build (r921371 + my QPID-2519 patch
applied but not being used.)

I think I'm doing this correctly, but I never see anything from the
receiver on the second broker.  As I understand it, shouldn't clients
be able to subscribe to topics and see published messages no matter
which broker they are connected to?

My end goal here is to add some fault tolerance to a Windows AMQP QPID
system, where clients can switch to another broker if their current
broker dies.  Since someone chose a Linux only solution for
clustering, I think my simplest option is to leverage Federation, with
something like ResilientConnection to manage a list of brokers, and
fail client connections over to other brokers on connection failure.
This is from my federation test case, and I'm probably setting
something up incorrectly...

Greatly appreciate the help!

Kerry Bonin



I bring up two brokers:
  start "5680" /Dd:\dev\qpid-r921371\cpp\build\src\debug qpidd.exe
--data-dir=.\qpidd.data.5680 --auth=no --port=5680
--load-module=qmfconsoled.dll
  start "5681" /Dd:\dev\qpid-r921371\cpp\build\src\debug qpidd.exe
--data-dir=.\qpidd.data.5681 --auth=no --port=5681
--load-module=qmfconsoled.dll

Create exchanges
  python D:\dev\qpid-r921371\tools\src\py\qpid-config -a
localhost:5680 add exchange topic fed.topic
  python D:\dev\qpid-r921371\tools\src\py\qpid-config -a
localhost:5681 add exchange topic fed.topic

Create routes
  python D:\dev\qpid-r921371\tools\src\py\qpid-route dynamic add
localhost:5680 localhost:5681 fed.topic
  python D:\dev\qpid-r921371\tools\src\py\qpid-route dynamic add
localhost:5681 localhost:5680 fed.topic

This appears to work correctly :
  D:\dev\qpid\bin>python d:\dev\qpid-r921371\tools\src\py\qpid-route
route map localhost:5680

  Finding Linked Brokers:
      localhost:5680... Ok
      localhost:5681... Ok

  Dynamic Routes:

    Exchange fed.topic:
      localhost:5681 <=> localhost:5680

  Static Routes:
    none found


Now trimmed from my C++ testbed...

// Setup URLs and Addresses

std::string urlA = "amqp:tcp:127.0.0.1:5680";
std::string urlB = "amqp:tcp:127.0.0.1:5681";
std::string queue = "fed.topic";
Address addressTx( queue );
Address addressRx( queue );
int64_t timeout = 1000;

// Setup transmitter on 5680

Connection connectionTxA;
connectionTxA.open( urlA );
Session sessionTxA = connectionTxA.newSession();
Sender senderTxA = sessionTxA.createSender( addressTx );

// Setup listeners on 5680 and 5681

Connection connectionRxA;
connectionRxA.open( urlA );
Session sessionRxA = connectionRxA.newSession();
Receiver receiverRxA = sessionRxA.createReceiver( addressRx );

Connection connectionRxB;
connectionRxB.open( urlB );
Session sessionRxB = connectionRxB.newSession();
Receiver receiverRxB = sessionRxB.createReceiver( addressRx );

// Transmit to 5680

Message messageOut;
MapContent contentOut( messageOut );
contentOut["id"] = 1234;
contentOut["name"] = "Request";
contentOut.encode();
senderTxA.send( messageOut );

// Local listener sees the message

Message messageRxA;
if( receiverRxA.fetch( messageRxA, qpid::sys::Duration( timeout ) ) )
{
        MapView contentRxA( messageRxA );
        std::cout << "  local received: " << contentRxA << std::endl;
        sessionRxA.acknowledge();
}
else
        std::cout << "  local timeout " << std::endl;

// Remote never does...

Message messageRxB;
if( receiverRxB.fetch( messageRxB, qpid::sys::Duration( timeout ) ) )
{
        MapView contentRxB( messageRxB );
        std::cout << "  remote received: " << contentRxB << std::endl;
        sessionRxB.acknowledge();
}
else
        std::cout << "  remote timeout " << std::endl;

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


RE: Help requested w/ federation - what am I doing wrong?

Posted by Steve Huston <sh...@riverace.com>.
Hi Kerry,

> OK, thanks for the pointer to that issue, I remember reading 
> it a while ago, didn't realize it was still open (I love 
> JIRA, but issue search isn't very effective, at least for me 
> - I often don't find highly relevant open issues...)

It's a  bit odd because search by default brings in every Apache project
- narrowing it to just Qpid and just Unresolved issues makes it better.

> Can you 
> think of any other open issues that might bite me in this 
> area?  In the mean time, I guess I'm blocked by 2199, if that 
> is the case I'll go fix it...

Great! I'll be happy to review patches attached to the JIRA. FYI, the
federation_tests currently hang, which is probably the issue you're
encountering. It's easy to just run that test with:

ctest -V -R federation_tests

-Steve

> On Wed, Apr 21, 2010 at 4:33 PM, Steve Huston 
> <sh...@riverace.com> wrote:
> > Hi Kerry,
> >
> > --
> > Steve Huston, Riverace Corporation
> > Total Lifecycle Support for Your Networked Applications 
> > http://www.riverace.com
> >
> >
> >> -----Original Message-----
> >> From: Kerry Bonin [mailto:kerrybonin@gmail.com]
> >> Sent: Wednesday, April 21, 2010 4:05 PM
> >> To: qpid-dev
> >> Subject: Help requested w/ federation - what am I doing wrong?
> >>
> >>
> >> This is on XP SP2, using recent build (r921371 + my 
> QPID-2519 patch 
> >> applied but not being used.)
> >>
> >> I think I'm doing this correctly, but I never see anything 
> from the 
> >> receiver on the second broker.  As I understand it, 
> shouldn't clients 
> >> be able to subscribe to topics and see published messages 
> no matter 
> >> which broker they are connected to?
> >
> > You're not doing anything wrong that I can see, but Windows has a 
> > known problem in this area: 
> > https://issues.apache.org/jira/browse/QPID-2199
> >
> > Nobody has had time to address it yet, so I don't know if this is a 
> > big or small problem. I suspect it's not terribly big.
> >
> > Let me know if you need some help navigating this.
> >
> > Best regards,
> > -Steve
> >
> >> My end goal here is to add some fault tolerance to a Windows AMQP 
> >> QPID system, where clients can switch to another broker if their 
> >> current broker dies.  Since someone chose a Linux only 
> solution for 
> >> clustering, I think my simplest option is to leverage Federation, 
> >> with something like ResilientConnection to manage a list 
> of brokers, 
> >> and fail client connections over to other brokers on connection
> >> failure. This is from my federation test case, and I'm
> >> probably setting something up incorrectly...
> >>
> >> Greatly appreciate the help!
> >>
> >> Kerry Bonin
> >>
> >>
> >>
> >> I bring up two brokers:
> >>   start "5680" /Dd:\dev\qpid-r921371\cpp\build\src\debug
> >> qpidd.exe --data-dir=.\qpidd.data.5680 --auth=no --port=5680 
> >> --load-module=qmfconsoled.dll
> >>   start "5681" /Dd:\dev\qpid-r921371\cpp\build\src\debug
> >> qpidd.exe --data-dir=.\qpidd.data.5681 --auth=no --port=5681 
> >> --load-module=qmfconsoled.dll
> >>
> >> Create exchanges
> >>   python D:\dev\qpid-r921371\tools\src\py\qpid-config -a 
> >> localhost:5680 add exchange topic fed.topic
> >>   python D:\dev\qpid-r921371\tools\src\py\qpid-config -a 
> >> localhost:5681 add exchange topic fed.topic
> >>
> >> Create routes
> >>   python D:\dev\qpid-r921371\tools\src\py\qpid-route dynamic add 
> >> localhost:5680 localhost:5681 fed.topic
> >>   python D:\dev\qpid-r921371\tools\src\py\qpid-route dynamic add 
> >> localhost:5681 localhost:5680 fed.topic
> >>
> >> This appears to work correctly :
> >>   D:\dev\qpid\bin>python 
> d:\dev\qpid-r921371\tools\src\py\qpid-route
> >> route map localhost:5680
> >>
> >>   Finding Linked Brokers:
> >>       localhost:5680... Ok
> >>       localhost:5681... Ok
> >>
> >>   Dynamic Routes:
> >>
> >>     Exchange fed.topic:
> >>       localhost:5681 <=> localhost:5680
> >>
> >>   Static Routes:
> >>     none found
> >>
> >>
> >> Now trimmed from my C++ testbed...
> >>
> >> // Setup URLs and Addresses
> >>
> >> std::string urlA = "amqp:tcp:127.0.0.1:5680";
> >> std::string urlB = "amqp:tcp:127.0.0.1:5681";
> >> std::string queue = "fed.topic";
> >> Address addressTx( queue );
> >> Address addressRx( queue );
> >> int64_t timeout = 1000;
> >>
> >> // Setup transmitter on 5680
> >>
> >> Connection connectionTxA;
> >> connectionTxA.open( urlA );
> >> Session sessionTxA = connectionTxA.newSession();
> >> Sender senderTxA = sessionTxA.createSender( addressTx );
> >>
> >> // Setup listeners on 5680 and 5681
> >>
> >> Connection connectionRxA;
> >> connectionRxA.open( urlA );
> >> Session sessionRxA = connectionRxA.newSession();
> >> Receiver receiverRxA = sessionRxA.createReceiver( addressRx );
> >>
> >> Connection connectionRxB;
> >> connectionRxB.open( urlB );
> >> Session sessionRxB = connectionRxB.newSession();
> >> Receiver receiverRxB = sessionRxB.createReceiver( addressRx );
> >>
> >> // Transmit to 5680
> >>
> >> Message messageOut;
> >> MapContent contentOut( messageOut );
> >> contentOut["id"] = 1234;
> >> contentOut["name"] = "Request";
> >> contentOut.encode();
> >> senderTxA.send( messageOut );
> >>
> >> // Local listener sees the message
> >>
> >> Message messageRxA;
> >> if( receiverRxA.fetch( messageRxA, qpid::sys::Duration( 
> timeout ) ) ) 
> >> {
> >>         MapView contentRxA( messageRxA );
> >>         std::cout << "  local received: " << contentRxA << 
> std::endl;
> >>         sessionRxA.acknowledge();
> >> }
> >> else
> >>         std::cout << "  local timeout " << std::endl;
> >>
> >> // Remote never does...
> >>
> >> Message messageRxB;
> >> if( receiverRxB.fetch( messageRxB, qpid::sys::Duration( 
> timeout ) ) ) 
> >> {
> >>         MapView contentRxB( messageRxB );
> >>         std::cout << "  remote received: " << contentRxB << 
> >> std::endl;
> >>         sessionRxB.acknowledge();
> >> }
> >> else
> >>         std::cout << "  remote timeout " << std::endl;
> >>
> >> 
> ---------------------------------------------------------------------
> >> Apache Qpid - AMQP Messaging Implementation
> >> Project:      http://qpid.apache.org
> >> Use/Interact: mailto:dev-subscribe@qpid.apache.org
> >>
> >>
> >
> >
> > 
> ---------------------------------------------------------------------
> > Apache Qpid - AMQP Messaging Implementation
> > Project:      http://qpid.apache.org
> > Use/Interact: mailto:dev-subscribe@qpid.apache.org
> >
> >
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
> 
> 


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


Re: Help requested w/ federation - what am I doing wrong?

Posted by Kerry Bonin <ke...@gmail.com>.
OK, thanks for the pointer to that issue, I remember reading it a
while ago, didn't realize it was still open (I love JIRA, but issue
search isn't very effective, at least for me - I often don't find
highly relevant open issues...)  Can you think of any other open
issues that might bite me in this area?  In the mean time, I guess I'm
blocked by 2199, if that is the case I'll go fix it...

Kerry

On Wed, Apr 21, 2010 at 4:33 PM, Steve Huston <sh...@riverace.com> wrote:
> Hi Kerry,
>
> --
> Steve Huston, Riverace Corporation
> Total Lifecycle Support for Your Networked Applications
> http://www.riverace.com
>
>
>> -----Original Message-----
>> From: Kerry Bonin [mailto:kerrybonin@gmail.com]
>> Sent: Wednesday, April 21, 2010 4:05 PM
>> To: qpid-dev
>> Subject: Help requested w/ federation - what am I doing wrong?
>>
>>
>> This is on XP SP2, using recent build (r921371 + my QPID-2519
>> patch applied but not being used.)
>>
>> I think I'm doing this correctly, but I never see anything
>> from the receiver on the second broker.  As I understand it,
>> shouldn't clients be able to subscribe to topics and see
>> published messages no matter which broker they are connected to?
>
> You're not doing anything wrong that I can see, but Windows has a known
> problem in this area: https://issues.apache.org/jira/browse/QPID-2199
>
> Nobody has had time to address it yet, so I don't know if this is a big
> or small problem. I suspect it's not terribly big.
>
> Let me know if you need some help navigating this.
>
> Best regards,
> -Steve
>
>> My end goal here is to add some fault tolerance to a Windows
>> AMQP QPID system, where clients can switch to another broker
>> if their current broker dies.  Since someone chose a Linux
>> only solution for clustering, I think my simplest option is
>> to leverage Federation, with something like
>> ResilientConnection to manage a list of brokers, and fail
>> client connections over to other brokers on connection
>> failure. This is from my federation test case, and I'm
>> probably setting something up incorrectly...
>>
>> Greatly appreciate the help!
>>
>> Kerry Bonin
>>
>>
>>
>> I bring up two brokers:
>>   start "5680" /Dd:\dev\qpid-r921371\cpp\build\src\debug
>> qpidd.exe --data-dir=.\qpidd.data.5680 --auth=no --port=5680
>> --load-module=qmfconsoled.dll
>>   start "5681" /Dd:\dev\qpid-r921371\cpp\build\src\debug
>> qpidd.exe --data-dir=.\qpidd.data.5681 --auth=no --port=5681
>> --load-module=qmfconsoled.dll
>>
>> Create exchanges
>>   python D:\dev\qpid-r921371\tools\src\py\qpid-config -a
>> localhost:5680 add exchange topic fed.topic
>>   python D:\dev\qpid-r921371\tools\src\py\qpid-config -a
>> localhost:5681 add exchange topic fed.topic
>>
>> Create routes
>>   python D:\dev\qpid-r921371\tools\src\py\qpid-route dynamic
>> add localhost:5680 localhost:5681 fed.topic
>>   python D:\dev\qpid-r921371\tools\src\py\qpid-route dynamic
>> add localhost:5681 localhost:5680 fed.topic
>>
>> This appears to work correctly :
>>   D:\dev\qpid\bin>python d:\dev\qpid-r921371\tools\src\py\qpid-route
>> route map localhost:5680
>>
>>   Finding Linked Brokers:
>>       localhost:5680... Ok
>>       localhost:5681... Ok
>>
>>   Dynamic Routes:
>>
>>     Exchange fed.topic:
>>       localhost:5681 <=> localhost:5680
>>
>>   Static Routes:
>>     none found
>>
>>
>> Now trimmed from my C++ testbed...
>>
>> // Setup URLs and Addresses
>>
>> std::string urlA = "amqp:tcp:127.0.0.1:5680";
>> std::string urlB = "amqp:tcp:127.0.0.1:5681";
>> std::string queue = "fed.topic";
>> Address addressTx( queue );
>> Address addressRx( queue );
>> int64_t timeout = 1000;
>>
>> // Setup transmitter on 5680
>>
>> Connection connectionTxA;
>> connectionTxA.open( urlA );
>> Session sessionTxA = connectionTxA.newSession();
>> Sender senderTxA = sessionTxA.createSender( addressTx );
>>
>> // Setup listeners on 5680 and 5681
>>
>> Connection connectionRxA;
>> connectionRxA.open( urlA );
>> Session sessionRxA = connectionRxA.newSession();
>> Receiver receiverRxA = sessionRxA.createReceiver( addressRx );
>>
>> Connection connectionRxB;
>> connectionRxB.open( urlB );
>> Session sessionRxB = connectionRxB.newSession();
>> Receiver receiverRxB = sessionRxB.createReceiver( addressRx );
>>
>> // Transmit to 5680
>>
>> Message messageOut;
>> MapContent contentOut( messageOut );
>> contentOut["id"] = 1234;
>> contentOut["name"] = "Request";
>> contentOut.encode();
>> senderTxA.send( messageOut );
>>
>> // Local listener sees the message
>>
>> Message messageRxA;
>> if( receiverRxA.fetch( messageRxA, qpid::sys::Duration(
>> timeout ) ) ) {
>>         MapView contentRxA( messageRxA );
>>         std::cout << "  local received: " << contentRxA << std::endl;
>>         sessionRxA.acknowledge();
>> }
>> else
>>         std::cout << "  local timeout " << std::endl;
>>
>> // Remote never does...
>>
>> Message messageRxB;
>> if( receiverRxB.fetch( messageRxB, qpid::sys::Duration(
>> timeout ) ) ) {
>>         MapView contentRxB( messageRxB );
>>         std::cout << "  remote received: " << contentRxB << std::endl;
>>         sessionRxB.acknowledge();
>> }
>> else
>>         std::cout << "  remote timeout " << std::endl;
>>
>> ---------------------------------------------------------------------
>> Apache Qpid - AMQP Messaging Implementation
>> Project:      http://qpid.apache.org
>> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>>
>>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
>
>

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org


RE: Help requested w/ federation - what am I doing wrong?

Posted by Steve Huston <sh...@riverace.com>.
Hi Kerry,

--
Steve Huston, Riverace Corporation
Total Lifecycle Support for Your Networked Applications
http://www.riverace.com


> -----Original Message-----
> From: Kerry Bonin [mailto:kerrybonin@gmail.com] 
> Sent: Wednesday, April 21, 2010 4:05 PM
> To: qpid-dev
> Subject: Help requested w/ federation - what am I doing wrong?
> 
> 
> This is on XP SP2, using recent build (r921371 + my QPID-2519 
> patch applied but not being used.)
> 
> I think I'm doing this correctly, but I never see anything 
> from the receiver on the second broker.  As I understand it, 
> shouldn't clients be able to subscribe to topics and see 
> published messages no matter which broker they are connected to?

You're not doing anything wrong that I can see, but Windows has a known
problem in this area: https://issues.apache.org/jira/browse/QPID-2199

Nobody has had time to address it yet, so I don't know if this is a big
or small problem. I suspect it's not terribly big.

Let me know if you need some help navigating this.

Best regards,
-Steve

> My end goal here is to add some fault tolerance to a Windows 
> AMQP QPID system, where clients can switch to another broker 
> if their current broker dies.  Since someone chose a Linux 
> only solution for clustering, I think my simplest option is 
> to leverage Federation, with something like 
> ResilientConnection to manage a list of brokers, and fail 
> client connections over to other brokers on connection 
> failure. This is from my federation test case, and I'm 
> probably setting something up incorrectly...
> 
> Greatly appreciate the help!
> 
> Kerry Bonin
> 
> 
> 
> I bring up two brokers:
>   start "5680" /Dd:\dev\qpid-r921371\cpp\build\src\debug 
> qpidd.exe --data-dir=.\qpidd.data.5680 --auth=no --port=5680 
> --load-module=qmfconsoled.dll
>   start "5681" /Dd:\dev\qpid-r921371\cpp\build\src\debug 
> qpidd.exe --data-dir=.\qpidd.data.5681 --auth=no --port=5681 
> --load-module=qmfconsoled.dll
> 
> Create exchanges
>   python D:\dev\qpid-r921371\tools\src\py\qpid-config -a 
> localhost:5680 add exchange topic fed.topic
>   python D:\dev\qpid-r921371\tools\src\py\qpid-config -a 
> localhost:5681 add exchange topic fed.topic
> 
> Create routes
>   python D:\dev\qpid-r921371\tools\src\py\qpid-route dynamic 
> add localhost:5680 localhost:5681 fed.topic
>   python D:\dev\qpid-r921371\tools\src\py\qpid-route dynamic 
> add localhost:5681 localhost:5680 fed.topic
> 
> This appears to work correctly :
>   D:\dev\qpid\bin>python d:\dev\qpid-r921371\tools\src\py\qpid-route
> route map localhost:5680
> 
>   Finding Linked Brokers:
>       localhost:5680... Ok
>       localhost:5681... Ok
> 
>   Dynamic Routes:
> 
>     Exchange fed.topic:
>       localhost:5681 <=> localhost:5680
> 
>   Static Routes:
>     none found
> 
> 
> Now trimmed from my C++ testbed...
> 
> // Setup URLs and Addresses
> 
> std::string urlA = "amqp:tcp:127.0.0.1:5680";
> std::string urlB = "amqp:tcp:127.0.0.1:5681";
> std::string queue = "fed.topic";
> Address addressTx( queue );
> Address addressRx( queue );
> int64_t timeout = 1000;
> 
> // Setup transmitter on 5680
> 
> Connection connectionTxA;
> connectionTxA.open( urlA );
> Session sessionTxA = connectionTxA.newSession();
> Sender senderTxA = sessionTxA.createSender( addressTx );
> 
> // Setup listeners on 5680 and 5681
> 
> Connection connectionRxA;
> connectionRxA.open( urlA );
> Session sessionRxA = connectionRxA.newSession();
> Receiver receiverRxA = sessionRxA.createReceiver( addressRx );
> 
> Connection connectionRxB;
> connectionRxB.open( urlB );
> Session sessionRxB = connectionRxB.newSession();
> Receiver receiverRxB = sessionRxB.createReceiver( addressRx );
> 
> // Transmit to 5680
> 
> Message messageOut;
> MapContent contentOut( messageOut );
> contentOut["id"] = 1234;
> contentOut["name"] = "Request";
> contentOut.encode();
> senderTxA.send( messageOut );
> 
> // Local listener sees the message
> 
> Message messageRxA;
> if( receiverRxA.fetch( messageRxA, qpid::sys::Duration( 
> timeout ) ) ) {
>         MapView contentRxA( messageRxA );
>         std::cout << "  local received: " << contentRxA << std::endl;
>         sessionRxA.acknowledge();
> }
> else
>         std::cout << "  local timeout " << std::endl;
> 
> // Remote never does...
> 
> Message messageRxB;
> if( receiverRxB.fetch( messageRxB, qpid::sys::Duration( 
> timeout ) ) ) {
>         MapView contentRxB( messageRxB );
>         std::cout << "  remote received: " << contentRxB << std::endl;
>         sessionRxB.acknowledge();
> }
> else
>         std::cout << "  remote timeout " << std::endl;
> 
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:dev-subscribe@qpid.apache.org
> 
> 


---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscribe@qpid.apache.org