You are viewing a plain text version of this content. The canonical link for it is here.
Posted to proton@qpid.apache.org by Michael Goulish <mg...@redhat.com> on 2013/04/04 20:09:13 UTC

problem with multiple senders


  Is this a bug, or am I  Doing  Something  Wrong ?



Scenario
{
  My sender sends a single message, and hopes to see
  that the receiver has accepted it.

  I launch 3 copies of the sender very close together-- 
  they all talk to the same address.   

  My receiver receives in a loop, accepts every message
  that it receives.
}




Result
{
  Sometimes my receiver gets 1 of the 3 messages.
  Usually it gets 2.
  It never gets all 3.

  The 3rd sender hangs in pn_messenger_send().

  While the 3rd sender is hanging in send(), the receiver
  is patiently waiting in recv().
}






Sender Code ############################################

/*
  Launch 3 of these from a script like so:
  ./sender 6666 &
  ./sender 6666 &
  ./sender 6666 &
*/


#include "proton/message.h"
#include "proton/messenger.h"

#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>


char *
status_2_str ( pn_status_t status )
{
  switch ( status )
  {
    case PN_STATUS_UNKNOWN:
      return "unknown";
      break;

    case PN_STATUS_PENDING:
      return "pending";
      break;

    case PN_STATUS_ACCEPTED:
      return "accepted";
      break;

    case PN_STATUS_REJECTED:
      return "rejected";
      break;

    default:
      return "bad value";
      break;
  }
}



pid_t my_pid = 0;


void
check ( char * label, int result )
{
  fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
}



int
main(int argc, char** argv)
{
  int c;
  char addr [ 1000 ];
  char msgtext [ 100 ];
  pn_message_t   * message;
  pn_messenger_t * messenger;
  pn_data_t      * body;
  pn_tracker_t     tracker;
  pn_status_t      status;
  int              result;

  my_pid = getpid();

  sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );


  message = pn_message ( );
  messenger = pn_messenger ( NULL );
  pn_messenger_start ( messenger ) ;
  pn_messenger_set_outgoing_window ( messenger, 1 );


  pn_message_set_address ( message, addr );
  body = pn_message_body ( message );


  sprintf ( msgtext, "Message from %d", getpid() );
  pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
  pn_messenger_put ( messenger, message );
  tracker = pn_messenger_outgoing_tracker ( messenger );
  pn_messenger_send ( messenger );


  status = pn_messenger_status ( messenger, tracker );
  fprintf ( stderr, "status : %s\n", status_2_str(status) );


  pn_messenger_stop ( messenger );
  pn_messenger_free ( messenger );
  pn_message_free ( message );

  return 0;
}




Receiver Code ########################################################

/*

  Launch like this:
  ./receiver 6666
*/

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>

#include "proton/message.h"
#include "proton/messenger.h"



#define BUFSIZE 1024



int
main(int argc, char** argv)
{
  size_t bufsize = BUFSIZE;
  char buffer [ BUFSIZE ];
  char addr [ 1000 ];
  pn_message_t   * message;
  pn_messenger_t * messenger;
  pn_data_t      * body;
  pn_tracker_t     tracker;


  sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );

  message = pn_message();
  messenger = pn_messenger ( NULL );

  pn_messenger_start(messenger);
  pn_messenger_subscribe ( messenger, addr );
  pn_messenger_set_incoming_window ( messenger, 5 );

  /*---------------------------------
    Receive and accept the message.
  ---------------------------------*/
  while ( 1 )
  {
    fprintf ( stderr, "receiving...\n" );
    pn_messenger_recv ( messenger, 3 );

    while ( pn_messenger_incoming ( messenger ) > 0 )
    {
      fprintf ( stderr, "getting message...\n" );
      pn_messenger_get ( messenger, message );
      tracker = pn_messenger_incoming_tracker ( messenger );
      pn_messenger_accept ( messenger, tracker, 0 );
      body = pn_message_body ( message );
      pn_data_format ( body, buffer, & bufsize );
      fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) );
      fprintf ( stdout, "Content: %s\n", buffer);
    }
  }

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);

  return 0;
}



Re: problem with multiple senders

Posted by Ken Giusti <kg...@redhat.com>.
Dang - hit send too soon:

https://issues.apache.org/jira/browse/PROTON-200

It depends on having the ability to revoke issued credit, which is still outstanding AFAIK.



----- Original Message -----
> From: "Ken Giusti" <kg...@redhat.com>
> To: proton@qpid.apache.org
> Sent: Thursday, April 4, 2013 5:46:04 PM
> Subject: Re: problem with multiple senders
> 
> FWIW: there's a JIRA that's tracking this:
> 
> 
> ----- Original Message -----
> > From: "Michael Goulish" <mg...@redhat.com>
> > To: proton@qpid.apache.org
> > Sent: Thursday, April 4, 2013 4:45:03 PM
> > Subject: Re: problem with multiple senders
> > 
> > Yes!   -1 did it.  Thanks!
> > 
> > 
> > 
> > ----- Original Message -----
> > > I think this is the same bug we've seen before with passing fixed
> > > (positive) credit limits to recv. The implementation isn't smart enough
> > > to
> > > pay attention to who actually is offering messages when it allocates
> > > credit, and so it ends up giving out all of its credit to a sender that
> > > has
> > > no use for it instead of to the senders that are blocked. I suspect if
> > > you
> > > replace your 3 with -1 in your call to pn_messenger_recv, then you will
> > > see
> > > the hang go away.
> > > 
> > > --Rafael
> > > 
> > > 
> > > On Thu, Apr 4, 2013 at 3:06 PM, Michael Goulish <mg...@redhat.com>
> > > wrote:
> > > 
> > > > OK, I'm looking at trace from receiver, and I thought
> > > > I would post it here so I can't be accused of hogging
> > > > all the fun for myself.
> > > >
> > > > ( Remember, three senders all send to same receiver address,
> > > >   only two get 'accepted' replies.  Last sender ends up hanging in
> > > >   send(),
> > > >   while receiver (in infinite loop) blocks on recv(). )
> > > >
> > > > I have marked the lines of application output with "APPLICATION
> > > > OUTPUT:"
> > > >
> > > >
> > > > Note:
> > > >
> > > > I see these 3 lines:
> > > >   Accepted from localhost:42468
> > > >   Accepted from localhost:42469
> > > >   Accepted from localhost:42470
> > > >
> > > > But only two get closed:
> > > >   Closed localhost:42468
> > > >   Closed localhost:42469
> > > >
> > > >
> > > >
> > > >
> > > > ----------------- begin trace -----------------------
> > > > Listening on 0.0.0.0:6666
> > > > APPLICATION OUTPUT:   receiving...
> > > > Accepted from localhost:42468
> > > > Accepted from localhost:42469
> > > >     <- SASL
> > > > [0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > > [0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > > [0x25013c0:0] -> SASL-OUTCOME @68 [0]
> > > >     -> SASL
> > > >     -> AMQP
> > > > [0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5",
> > > > null,
> > > > null, null, null, null, null, null, null]
> > > > Accepted from localhost:42470
> > > >     <- SASL
> > > > [0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > > [0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > > [0x253f490:0] -> SASL-OUTCOME @68 [0]
> > > >     -> SASL
> > > >     -> AMQP
> > > > [0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5",
> > > > null,
> > > > null, null, null, null, null, null, null]
> > > >     <- AMQP
> > > > [0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3",
> > > > "0.0.0.0", null, null, null, null, null, null, null]
> > > > [0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > > [0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > > [null, 0, null, 0, false, null, null, null, null, null, null], @41
> > > > [null,
> > > > 0, null, 0, false, null, null], null, null, 0]
> > > > [0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > > [0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > > null, null, null, 0]
> > > > [0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
> > > >     <- SASL
> > > > [0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > > [0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > > [0x2563350:0] -> SASL-OUTCOME @68 [0]
> > > >     -> SASL
> > > >     -> AMQP
> > > > [0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5",
> > > > null,
> > > > null, null, null, null, null, null, null]
> > > >     <- AMQP
> > > > [0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf",
> > > > "0.0.0.0", null, null, null, null, null, null, null]
> > > > [0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > > [0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > > [null, 0, null, 0, false, null, null, null, null, null, null], @41
> > > > [null,
> > > > 0, null, 0, false, null, null], null, null, 0]
> > > > [0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > > [0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > > null, null, null, 0]
> > > > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> > > >     <- AMQP
> > > > [0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb",
> > > > "0.0.0.0", null, null, null, null, null, null, null]
> > > > [0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > > [0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > > [null, 0, null, 0, false, null, null, null, null, null, null], @41
> > > > [null,
> > > > 0, null, 0, false, null, null], null, null, 0]
> > > > [0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > > [0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > > null, null, null, 0]
> > > > [0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> > > > [0x24fae10:1] <- TRANSFER @20 [1, 0,
> > > > b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > > > 0, false, false] (148)
> > > > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > > > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > > > @\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@
> > > > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > > > \x00@\x00Sw\xa1\x12Message from 22470"
> > > > APPLICATION OUTPUT:   getting message...
> > > > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > > > APPLICATION OUTPUT:   Content: "Message from 22470"
> > > > APPLICATION OUTPUT:   receiving...
> > > > [0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > > > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
> > > > [0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > > > [0x2538e40:1] <- TRANSFER @20 [1, 0,
> > > > b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > > > 0, false, false] (148)
> > > > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > > > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > > > @\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@
> > > > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > > > \x00@\x00Sw\xa1\x12Message from 22469"
> > > > getting message...
> > > > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > > > APPLICATION OUTPUT:   Content: "Message from 22469
> > > > APPLICATION OUTPUT:   receiving...
> > > > [0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
> > > > [0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > > > [0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > > > [0x24fae10:1] <- DETACH @22 [1, true, null]
> > > > [0x24fae10:0] <- CLOSE @24 [null]
> > > > [0x24fae10:0] <- EOS
> > > > [0x24fae10:1] -> DETACH @22 [1, true, null]
> > > > [0x24fae10:0] -> CLOSE @24 [null]
> > > > [0x24fae10:0] -> EOS
> > > > Closed localhost:42468
> > > > [0x2538e40:1] <- DETACH @22 [1, true, null]
> > > > [0x2538e40:0] <- CLOSE @24 [null]
> > > > [0x2538e40:0] <- EOS
> > > > [0x2538e40:1] -> DETACH @22 [1, true, null]
> > > > [0x2538e40:0] -> CLOSE @24 [null]
> > > > [0x2538e40:0] -> EOS
> > > > Closed localhost:42469
> > > >
> > > > ----------------- end trace -------------------------
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > ----- Original Message -----
> > > > Any clues from a trace of the receiver?
> > > >
> > > > $ PN_TRACE_FRM=1 ./receiver 6666
> > > >
> > > > -Ted
> > > >
> > > > On 04/04/2013 02:09 PM, Michael Goulish wrote:
> > > > >
> > > > >    Is this a bug, or am I  Doing  Something  Wrong ?
> > > > >
> > > > >
> > > > >
> > > > > Scenario
> > > > > {
> > > > >    My sender sends a single message, and hopes to see
> > > > >    that the receiver has accepted it.
> > > > >
> > > > >    I launch 3 copies of the sender very close together--
> > > > >    they all talk to the same address.
> > > > >
> > > > >    My receiver receives in a loop, accepts every message
> > > > >    that it receives.
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Result
> > > > > {
> > > > >    Sometimes my receiver gets 1 of the 3 messages.
> > > > >    Usually it gets 2.
> > > > >    It never gets all 3.
> > > > >
> > > > >    The 3rd sender hangs in pn_messenger_send().
> > > > >
> > > > >    While the 3rd sender is hanging in send(), the receiver
> > > > >    is patiently waiting in recv().
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Sender Code ############################################
> > > > >
> > > > > /*
> > > > >    Launch 3 of these from a script like so:
> > > > >    ./sender 6666 &
> > > > >    ./sender 6666 &
> > > > >    ./sender 6666 &
> > > > > */
> > > > >
> > > > >
> > > > > #include "proton/message.h"
> > > > > #include "proton/messenger.h"
> > > > >
> > > > > #include <getopt.h>
> > > > > #include <stdio.h>
> > > > > #include <stdlib.h>
> > > > > #include <string.h>
> > > > > #include <ctype.h>
> > > > >
> > > > >
> > > > > char *
> > > > > status_2_str ( pn_status_t status )
> > > > > {
> > > > >    switch ( status )
> > > > >    {
> > > > >      case PN_STATUS_UNKNOWN:
> > > > >        return "unknown";
> > > > >        break;
> > > > >
> > > > >      case PN_STATUS_PENDING:
> > > > >        return "pending";
> > > > >        break;
> > > > >
> > > > >      case PN_STATUS_ACCEPTED:
> > > > >        return "accepted";
> > > > >        break;
> > > > >
> > > > >      case PN_STATUS_REJECTED:
> > > > >        return "rejected";
> > > > >        break;
> > > > >
> > > > >      default:
> > > > >        return "bad value";
> > > > >        break;
> > > > >    }
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > pid_t my_pid = 0;
> > > > >
> > > > >
> > > > > void
> > > > > check ( char * label, int result )
> > > > > {
> > > > >    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > int
> > > > > main(int argc, char** argv)
> > > > > {
> > > > >    int c;
> > > > >    char addr [ 1000 ];
> > > > >    char msgtext [ 100 ];
> > > > >    pn_message_t   * message;
> > > > >    pn_messenger_t * messenger;
> > > > >    pn_data_t      * body;
> > > > >    pn_tracker_t     tracker;
> > > > >    pn_status_t      status;
> > > > >    int              result;
> > > > >
> > > > >    my_pid = getpid();
> > > > >
> > > > >    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
> > > > >
> > > > >
> > > > >    message = pn_message ( );
> > > > >    messenger = pn_messenger ( NULL );
> > > > >    pn_messenger_start ( messenger ) ;
> > > > >    pn_messenger_set_outgoing_window ( messenger, 1 );
> > > > >
> > > > >
> > > > >    pn_message_set_address ( message, addr );
> > > > >    body = pn_message_body ( message );
> > > > >
> > > > >
> > > > >    sprintf ( msgtext, "Message from %d", getpid() );
> > > > >    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext
> > > > >    ));
> > > > >    pn_messenger_put ( messenger, message );
> > > > >    tracker = pn_messenger_outgoing_tracker ( messenger );
> > > > >    pn_messenger_send ( messenger );
> > > > >
> > > > >
> > > > >    status = pn_messenger_status ( messenger, tracker );
> > > > >    fprintf ( stderr, "status : %s\n", status_2_str(status) );
> > > > >
> > > > >
> > > > >    pn_messenger_stop ( messenger );
> > > > >    pn_messenger_free ( messenger );
> > > > >    pn_message_free ( message );
> > > > >
> > > > >    return 0;
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Receiver Code
> > > > > ########################################################
> > > > >
> > > > > /*
> > > > >
> > > > >    Launch like this:
> > > > >    ./receiver 6666
> > > > > */
> > > > >
> > > > > #include <stdio.h>
> > > > > #include <stdlib.h>
> > > > > #include <ctype.h>
> > > > >
> > > > > #include "proton/message.h"
> > > > > #include "proton/messenger.h"
> > > > >
> > > > >
> > > > >
> > > > > #define BUFSIZE 1024
> > > > >
> > > > >
> > > > >
> > > > > int
> > > > > main(int argc, char** argv)
> > > > > {
> > > > >    size_t bufsize = BUFSIZE;
> > > > >    char buffer [ BUFSIZE ];
> > > > >    char addr [ 1000 ];
> > > > >    pn_message_t   * message;
> > > > >    pn_messenger_t * messenger;
> > > > >    pn_data_t      * body;
> > > > >    pn_tracker_t     tracker;
> > > > >
> > > > >
> > > > >    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
> > > > >
> > > > >    message = pn_message();
> > > > >    messenger = pn_messenger ( NULL );
> > > > >
> > > > >    pn_messenger_start(messenger);
> > > > >    pn_messenger_subscribe ( messenger, addr );
> > > > >    pn_messenger_set_incoming_window ( messenger, 5 );
> > > > >
> > > > >    /*---------------------------------
> > > > >      Receive and accept the message.
> > > > >    ---------------------------------*/
> > > > >    while ( 1 )
> > > > >    {
> > > > >      fprintf ( stderr, "receiving...\n" );
> > > > >      pn_messenger_recv ( messenger, 3 );
> > > > >
> > > > >      while ( pn_messenger_incoming ( messenger ) > 0 )
> > > > >      {
> > > > >        fprintf ( stderr, "getting message...\n" );
> > > > >        pn_messenger_get ( messenger, message );
> > > > >        tracker = pn_messenger_incoming_tracker ( messenger );
> > > > >        pn_messenger_accept ( messenger, tracker, 0 );
> > > > >        body = pn_message_body ( message );
> > > > >        pn_data_format ( body, buffer, & bufsize );
> > > > >        fprintf ( stdout, "Address: %s\n", pn_message_get_address (
> > > > message ) );
> > > > >        fprintf ( stdout, "Content: %s\n", buffer);
> > > > >      }
> > > > >    }
> > > > >
> > > > >    pn_messenger_stop(messenger);
> > > > >    pn_messenger_free(messenger);
> > > > >
> > > > >    return 0;
> > > > > }
> > > > >
> > > > >
> > > >
> > > >
> > > 
> > 
> 
> --
> -K
> 

-- 
-K

Re: problem with multiple senders

Posted by Ken Giusti <kg...@redhat.com>.
FWIW: there's a JIRA that's tracking this:


----- Original Message -----
> From: "Michael Goulish" <mg...@redhat.com>
> To: proton@qpid.apache.org
> Sent: Thursday, April 4, 2013 4:45:03 PM
> Subject: Re: problem with multiple senders
> 
> Yes!   -1 did it.  Thanks!
> 
> 
> 
> ----- Original Message -----
> > I think this is the same bug we've seen before with passing fixed
> > (positive) credit limits to recv. The implementation isn't smart enough to
> > pay attention to who actually is offering messages when it allocates
> > credit, and so it ends up giving out all of its credit to a sender that has
> > no use for it instead of to the senders that are blocked. I suspect if you
> > replace your 3 with -1 in your call to pn_messenger_recv, then you will see
> > the hang go away.
> > 
> > --Rafael
> > 
> > 
> > On Thu, Apr 4, 2013 at 3:06 PM, Michael Goulish <mg...@redhat.com>
> > wrote:
> > 
> > > OK, I'm looking at trace from receiver, and I thought
> > > I would post it here so I can't be accused of hogging
> > > all the fun for myself.
> > >
> > > ( Remember, three senders all send to same receiver address,
> > >   only two get 'accepted' replies.  Last sender ends up hanging in
> > >   send(),
> > >   while receiver (in infinite loop) blocks on recv(). )
> > >
> > > I have marked the lines of application output with "APPLICATION OUTPUT:"
> > >
> > >
> > > Note:
> > >
> > > I see these 3 lines:
> > >   Accepted from localhost:42468
> > >   Accepted from localhost:42469
> > >   Accepted from localhost:42470
> > >
> > > But only two get closed:
> > >   Closed localhost:42468
> > >   Closed localhost:42469
> > >
> > >
> > >
> > >
> > > ----------------- begin trace -----------------------
> > > Listening on 0.0.0.0:6666
> > > APPLICATION OUTPUT:   receiving...
> > > Accepted from localhost:42468
> > > Accepted from localhost:42469
> > >     <- SASL
> > > [0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > [0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > [0x25013c0:0] -> SASL-OUTCOME @68 [0]
> > >     -> SASL
> > >     -> AMQP
> > > [0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > > null, null, null, null, null, null, null]
> > > Accepted from localhost:42470
> > >     <- SASL
> > > [0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > [0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > [0x253f490:0] -> SASL-OUTCOME @68 [0]
> > >     -> SASL
> > >     -> AMQP
> > > [0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > > null, null, null, null, null, null, null]
> > >     <- AMQP
> > > [0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3",
> > > "0.0.0.0", null, null, null, null, null, null, null]
> > > [0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > [0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > > 0, null, 0, false, null, null], null, null, 0]
> > > [0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > [0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > null, null, null, 0]
> > > [0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
> > >     <- SASL
> > > [0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > [0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > [0x2563350:0] -> SASL-OUTCOME @68 [0]
> > >     -> SASL
> > >     -> AMQP
> > > [0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > > null, null, null, null, null, null, null]
> > >     <- AMQP
> > > [0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf",
> > > "0.0.0.0", null, null, null, null, null, null, null]
> > > [0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > [0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > > 0, null, 0, false, null, null], null, null, 0]
> > > [0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > [0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > null, null, null, 0]
> > > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> > >     <- AMQP
> > > [0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb",
> > > "0.0.0.0", null, null, null, null, null, null, null]
> > > [0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > [0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > > 0, null, 0, false, null, null], null, null, 0]
> > > [0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > [0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > null, null, null, 0]
> > > [0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> > > [0x24fae10:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > > 0, false, false] (148)
> > > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > > @\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@
> > > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > > \x00@\x00Sw\xa1\x12Message from 22470"
> > > APPLICATION OUTPUT:   getting message...
> > > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > > APPLICATION OUTPUT:   Content: "Message from 22470"
> > > APPLICATION OUTPUT:   receiving...
> > > [0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
> > > [0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > > [0x2538e40:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > > 0, false, false] (148)
> > > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > > @\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@
> > > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > > \x00@\x00Sw\xa1\x12Message from 22469"
> > > getting message...
> > > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > > APPLICATION OUTPUT:   Content: "Message from 22469
> > > APPLICATION OUTPUT:   receiving...
> > > [0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
> > > [0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > > [0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > > [0x24fae10:1] <- DETACH @22 [1, true, null]
> > > [0x24fae10:0] <- CLOSE @24 [null]
> > > [0x24fae10:0] <- EOS
> > > [0x24fae10:1] -> DETACH @22 [1, true, null]
> > > [0x24fae10:0] -> CLOSE @24 [null]
> > > [0x24fae10:0] -> EOS
> > > Closed localhost:42468
> > > [0x2538e40:1] <- DETACH @22 [1, true, null]
> > > [0x2538e40:0] <- CLOSE @24 [null]
> > > [0x2538e40:0] <- EOS
> > > [0x2538e40:1] -> DETACH @22 [1, true, null]
> > > [0x2538e40:0] -> CLOSE @24 [null]
> > > [0x2538e40:0] -> EOS
> > > Closed localhost:42469
> > >
> > > ----------------- end trace -------------------------
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > ----- Original Message -----
> > > Any clues from a trace of the receiver?
> > >
> > > $ PN_TRACE_FRM=1 ./receiver 6666
> > >
> > > -Ted
> > >
> > > On 04/04/2013 02:09 PM, Michael Goulish wrote:
> > > >
> > > >    Is this a bug, or am I  Doing  Something  Wrong ?
> > > >
> > > >
> > > >
> > > > Scenario
> > > > {
> > > >    My sender sends a single message, and hopes to see
> > > >    that the receiver has accepted it.
> > > >
> > > >    I launch 3 copies of the sender very close together--
> > > >    they all talk to the same address.
> > > >
> > > >    My receiver receives in a loop, accepts every message
> > > >    that it receives.
> > > > }
> > > >
> > > >
> > > >
> > > >
> > > > Result
> > > > {
> > > >    Sometimes my receiver gets 1 of the 3 messages.
> > > >    Usually it gets 2.
> > > >    It never gets all 3.
> > > >
> > > >    The 3rd sender hangs in pn_messenger_send().
> > > >
> > > >    While the 3rd sender is hanging in send(), the receiver
> > > >    is patiently waiting in recv().
> > > > }
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > Sender Code ############################################
> > > >
> > > > /*
> > > >    Launch 3 of these from a script like so:
> > > >    ./sender 6666 &
> > > >    ./sender 6666 &
> > > >    ./sender 6666 &
> > > > */
> > > >
> > > >
> > > > #include "proton/message.h"
> > > > #include "proton/messenger.h"
> > > >
> > > > #include <getopt.h>
> > > > #include <stdio.h>
> > > > #include <stdlib.h>
> > > > #include <string.h>
> > > > #include <ctype.h>
> > > >
> > > >
> > > > char *
> > > > status_2_str ( pn_status_t status )
> > > > {
> > > >    switch ( status )
> > > >    {
> > > >      case PN_STATUS_UNKNOWN:
> > > >        return "unknown";
> > > >        break;
> > > >
> > > >      case PN_STATUS_PENDING:
> > > >        return "pending";
> > > >        break;
> > > >
> > > >      case PN_STATUS_ACCEPTED:
> > > >        return "accepted";
> > > >        break;
> > > >
> > > >      case PN_STATUS_REJECTED:
> > > >        return "rejected";
> > > >        break;
> > > >
> > > >      default:
> > > >        return "bad value";
> > > >        break;
> > > >    }
> > > > }
> > > >
> > > >
> > > >
> > > > pid_t my_pid = 0;
> > > >
> > > >
> > > > void
> > > > check ( char * label, int result )
> > > > {
> > > >    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> > > > }
> > > >
> > > >
> > > >
> > > > int
> > > > main(int argc, char** argv)
> > > > {
> > > >    int c;
> > > >    char addr [ 1000 ];
> > > >    char msgtext [ 100 ];
> > > >    pn_message_t   * message;
> > > >    pn_messenger_t * messenger;
> > > >    pn_data_t      * body;
> > > >    pn_tracker_t     tracker;
> > > >    pn_status_t      status;
> > > >    int              result;
> > > >
> > > >    my_pid = getpid();
> > > >
> > > >    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
> > > >
> > > >
> > > >    message = pn_message ( );
> > > >    messenger = pn_messenger ( NULL );
> > > >    pn_messenger_start ( messenger ) ;
> > > >    pn_messenger_set_outgoing_window ( messenger, 1 );
> > > >
> > > >
> > > >    pn_message_set_address ( message, addr );
> > > >    body = pn_message_body ( message );
> > > >
> > > >
> > > >    sprintf ( msgtext, "Message from %d", getpid() );
> > > >    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext
> > > >    ));
> > > >    pn_messenger_put ( messenger, message );
> > > >    tracker = pn_messenger_outgoing_tracker ( messenger );
> > > >    pn_messenger_send ( messenger );
> > > >
> > > >
> > > >    status = pn_messenger_status ( messenger, tracker );
> > > >    fprintf ( stderr, "status : %s\n", status_2_str(status) );
> > > >
> > > >
> > > >    pn_messenger_stop ( messenger );
> > > >    pn_messenger_free ( messenger );
> > > >    pn_message_free ( message );
> > > >
> > > >    return 0;
> > > > }
> > > >
> > > >
> > > >
> > > >
> > > > Receiver Code ########################################################
> > > >
> > > > /*
> > > >
> > > >    Launch like this:
> > > >    ./receiver 6666
> > > > */
> > > >
> > > > #include <stdio.h>
> > > > #include <stdlib.h>
> > > > #include <ctype.h>
> > > >
> > > > #include "proton/message.h"
> > > > #include "proton/messenger.h"
> > > >
> > > >
> > > >
> > > > #define BUFSIZE 1024
> > > >
> > > >
> > > >
> > > > int
> > > > main(int argc, char** argv)
> > > > {
> > > >    size_t bufsize = BUFSIZE;
> > > >    char buffer [ BUFSIZE ];
> > > >    char addr [ 1000 ];
> > > >    pn_message_t   * message;
> > > >    pn_messenger_t * messenger;
> > > >    pn_data_t      * body;
> > > >    pn_tracker_t     tracker;
> > > >
> > > >
> > > >    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
> > > >
> > > >    message = pn_message();
> > > >    messenger = pn_messenger ( NULL );
> > > >
> > > >    pn_messenger_start(messenger);
> > > >    pn_messenger_subscribe ( messenger, addr );
> > > >    pn_messenger_set_incoming_window ( messenger, 5 );
> > > >
> > > >    /*---------------------------------
> > > >      Receive and accept the message.
> > > >    ---------------------------------*/
> > > >    while ( 1 )
> > > >    {
> > > >      fprintf ( stderr, "receiving...\n" );
> > > >      pn_messenger_recv ( messenger, 3 );
> > > >
> > > >      while ( pn_messenger_incoming ( messenger ) > 0 )
> > > >      {
> > > >        fprintf ( stderr, "getting message...\n" );
> > > >        pn_messenger_get ( messenger, message );
> > > >        tracker = pn_messenger_incoming_tracker ( messenger );
> > > >        pn_messenger_accept ( messenger, tracker, 0 );
> > > >        body = pn_message_body ( message );
> > > >        pn_data_format ( body, buffer, & bufsize );
> > > >        fprintf ( stdout, "Address: %s\n", pn_message_get_address (
> > > message ) );
> > > >        fprintf ( stdout, "Content: %s\n", buffer);
> > > >      }
> > > >    }
> > > >
> > > >    pn_messenger_stop(messenger);
> > > >    pn_messenger_free(messenger);
> > > >
> > > >    return 0;
> > > > }
> > > >
> > > >
> > >
> > >
> > 
> 

-- 
-K

Re: problem with multiple senders

Posted by Michael Goulish <mg...@redhat.com>.
Yes!   -1 did it.  Thanks!



----- Original Message -----
> I think this is the same bug we've seen before with passing fixed
> (positive) credit limits to recv. The implementation isn't smart enough to
> pay attention to who actually is offering messages when it allocates
> credit, and so it ends up giving out all of its credit to a sender that has
> no use for it instead of to the senders that are blocked. I suspect if you
> replace your 3 with -1 in your call to pn_messenger_recv, then you will see
> the hang go away.
> 
> --Rafael
> 
> 
> On Thu, Apr 4, 2013 at 3:06 PM, Michael Goulish <mg...@redhat.com> wrote:
> 
> > OK, I'm looking at trace from receiver, and I thought
> > I would post it here so I can't be accused of hogging
> > all the fun for myself.
> >
> > ( Remember, three senders all send to same receiver address,
> >   only two get 'accepted' replies.  Last sender ends up hanging in send(),
> >   while receiver (in infinite loop) blocks on recv(). )
> >
> > I have marked the lines of application output with "APPLICATION OUTPUT:"
> >
> >
> > Note:
> >
> > I see these 3 lines:
> >   Accepted from localhost:42468
> >   Accepted from localhost:42469
> >   Accepted from localhost:42470
> >
> > But only two get closed:
> >   Closed localhost:42468
> >   Closed localhost:42469
> >
> >
> >
> >
> > ----------------- begin trace -----------------------
> > Listening on 0.0.0.0:6666
> > APPLICATION OUTPUT:   receiving...
> > Accepted from localhost:42468
> > Accepted from localhost:42469
> >     <- SASL
> > [0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > [0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > [0x25013c0:0] -> SASL-OUTCOME @68 [0]
> >     -> SASL
> >     -> AMQP
> > [0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > null, null, null, null, null, null, null]
> > Accepted from localhost:42470
> >     <- SASL
> > [0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > [0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > [0x253f490:0] -> SASL-OUTCOME @68 [0]
> >     -> SASL
> >     -> AMQP
> > [0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > null, null, null, null, null, null, null]
> >     <- AMQP
> > [0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3",
> > "0.0.0.0", null, null, null, null, null, null, null]
> > [0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > [0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > 0, null, 0, false, null, null], null, null, 0]
> > [0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > [0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > null, null, null, 0]
> > [0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
> >     <- SASL
> > [0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > [0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > [0x2563350:0] -> SASL-OUTCOME @68 [0]
> >     -> SASL
> >     -> AMQP
> > [0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > null, null, null, null, null, null, null]
> >     <- AMQP
> > [0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf",
> > "0.0.0.0", null, null, null, null, null, null, null]
> > [0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > [0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > 0, null, 0, false, null, null], null, null, 0]
> > [0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > [0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > null, null, null, 0]
> > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> >     <- AMQP
> > [0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb",
> > "0.0.0.0", null, null, null, null, null, null, null]
> > [0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > [0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > 0, null, 0, false, null, null], null, null, 0]
> > [0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > [0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > null, null, null, 0]
> > [0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> > [0x24fae10:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > @\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@
> > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > \x00@\x00Sw\xa1\x12Message from 22470"
> > APPLICATION OUTPUT:   getting message...
> > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > APPLICATION OUTPUT:   Content: "Message from 22470"
> > APPLICATION OUTPUT:   receiving...
> > [0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
> > [0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > [0x2538e40:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > @\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@
> > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > \x00@\x00Sw\xa1\x12Message from 22469"
> > getting message...
> > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > APPLICATION OUTPUT:   Content: "Message from 22469
> > APPLICATION OUTPUT:   receiving...
> > [0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
> > [0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > [0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > [0x24fae10:1] <- DETACH @22 [1, true, null]
> > [0x24fae10:0] <- CLOSE @24 [null]
> > [0x24fae10:0] <- EOS
> > [0x24fae10:1] -> DETACH @22 [1, true, null]
> > [0x24fae10:0] -> CLOSE @24 [null]
> > [0x24fae10:0] -> EOS
> > Closed localhost:42468
> > [0x2538e40:1] <- DETACH @22 [1, true, null]
> > [0x2538e40:0] <- CLOSE @24 [null]
> > [0x2538e40:0] <- EOS
> > [0x2538e40:1] -> DETACH @22 [1, true, null]
> > [0x2538e40:0] -> CLOSE @24 [null]
> > [0x2538e40:0] -> EOS
> > Closed localhost:42469
> >
> > ----------------- end trace -------------------------
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > ----- Original Message -----
> > Any clues from a trace of the receiver?
> >
> > $ PN_TRACE_FRM=1 ./receiver 6666
> >
> > -Ted
> >
> > On 04/04/2013 02:09 PM, Michael Goulish wrote:
> > >
> > >    Is this a bug, or am I  Doing  Something  Wrong ?
> > >
> > >
> > >
> > > Scenario
> > > {
> > >    My sender sends a single message, and hopes to see
> > >    that the receiver has accepted it.
> > >
> > >    I launch 3 copies of the sender very close together--
> > >    they all talk to the same address.
> > >
> > >    My receiver receives in a loop, accepts every message
> > >    that it receives.
> > > }
> > >
> > >
> > >
> > >
> > > Result
> > > {
> > >    Sometimes my receiver gets 1 of the 3 messages.
> > >    Usually it gets 2.
> > >    It never gets all 3.
> > >
> > >    The 3rd sender hangs in pn_messenger_send().
> > >
> > >    While the 3rd sender is hanging in send(), the receiver
> > >    is patiently waiting in recv().
> > > }
> > >
> > >
> > >
> > >
> > >
> > >
> > > Sender Code ############################################
> > >
> > > /*
> > >    Launch 3 of these from a script like so:
> > >    ./sender 6666 &
> > >    ./sender 6666 &
> > >    ./sender 6666 &
> > > */
> > >
> > >
> > > #include "proton/message.h"
> > > #include "proton/messenger.h"
> > >
> > > #include <getopt.h>
> > > #include <stdio.h>
> > > #include <stdlib.h>
> > > #include <string.h>
> > > #include <ctype.h>
> > >
> > >
> > > char *
> > > status_2_str ( pn_status_t status )
> > > {
> > >    switch ( status )
> > >    {
> > >      case PN_STATUS_UNKNOWN:
> > >        return "unknown";
> > >        break;
> > >
> > >      case PN_STATUS_PENDING:
> > >        return "pending";
> > >        break;
> > >
> > >      case PN_STATUS_ACCEPTED:
> > >        return "accepted";
> > >        break;
> > >
> > >      case PN_STATUS_REJECTED:
> > >        return "rejected";
> > >        break;
> > >
> > >      default:
> > >        return "bad value";
> > >        break;
> > >    }
> > > }
> > >
> > >
> > >
> > > pid_t my_pid = 0;
> > >
> > >
> > > void
> > > check ( char * label, int result )
> > > {
> > >    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> > > }
> > >
> > >
> > >
> > > int
> > > main(int argc, char** argv)
> > > {
> > >    int c;
> > >    char addr [ 1000 ];
> > >    char msgtext [ 100 ];
> > >    pn_message_t   * message;
> > >    pn_messenger_t * messenger;
> > >    pn_data_t      * body;
> > >    pn_tracker_t     tracker;
> > >    pn_status_t      status;
> > >    int              result;
> > >
> > >    my_pid = getpid();
> > >
> > >    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
> > >
> > >
> > >    message = pn_message ( );
> > >    messenger = pn_messenger ( NULL );
> > >    pn_messenger_start ( messenger ) ;
> > >    pn_messenger_set_outgoing_window ( messenger, 1 );
> > >
> > >
> > >    pn_message_set_address ( message, addr );
> > >    body = pn_message_body ( message );
> > >
> > >
> > >    sprintf ( msgtext, "Message from %d", getpid() );
> > >    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
> > >    pn_messenger_put ( messenger, message );
> > >    tracker = pn_messenger_outgoing_tracker ( messenger );
> > >    pn_messenger_send ( messenger );
> > >
> > >
> > >    status = pn_messenger_status ( messenger, tracker );
> > >    fprintf ( stderr, "status : %s\n", status_2_str(status) );
> > >
> > >
> > >    pn_messenger_stop ( messenger );
> > >    pn_messenger_free ( messenger );
> > >    pn_message_free ( message );
> > >
> > >    return 0;
> > > }
> > >
> > >
> > >
> > >
> > > Receiver Code ########################################################
> > >
> > > /*
> > >
> > >    Launch like this:
> > >    ./receiver 6666
> > > */
> > >
> > > #include <stdio.h>
> > > #include <stdlib.h>
> > > #include <ctype.h>
> > >
> > > #include "proton/message.h"
> > > #include "proton/messenger.h"
> > >
> > >
> > >
> > > #define BUFSIZE 1024
> > >
> > >
> > >
> > > int
> > > main(int argc, char** argv)
> > > {
> > >    size_t bufsize = BUFSIZE;
> > >    char buffer [ BUFSIZE ];
> > >    char addr [ 1000 ];
> > >    pn_message_t   * message;
> > >    pn_messenger_t * messenger;
> > >    pn_data_t      * body;
> > >    pn_tracker_t     tracker;
> > >
> > >
> > >    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
> > >
> > >    message = pn_message();
> > >    messenger = pn_messenger ( NULL );
> > >
> > >    pn_messenger_start(messenger);
> > >    pn_messenger_subscribe ( messenger, addr );
> > >    pn_messenger_set_incoming_window ( messenger, 5 );
> > >
> > >    /*---------------------------------
> > >      Receive and accept the message.
> > >    ---------------------------------*/
> > >    while ( 1 )
> > >    {
> > >      fprintf ( stderr, "receiving...\n" );
> > >      pn_messenger_recv ( messenger, 3 );
> > >
> > >      while ( pn_messenger_incoming ( messenger ) > 0 )
> > >      {
> > >        fprintf ( stderr, "getting message...\n" );
> > >        pn_messenger_get ( messenger, message );
> > >        tracker = pn_messenger_incoming_tracker ( messenger );
> > >        pn_messenger_accept ( messenger, tracker, 0 );
> > >        body = pn_message_body ( message );
> > >        pn_data_format ( body, buffer, & bufsize );
> > >        fprintf ( stdout, "Address: %s\n", pn_message_get_address (
> > message ) );
> > >        fprintf ( stdout, "Content: %s\n", buffer);
> > >      }
> > >    }
> > >
> > >    pn_messenger_stop(messenger);
> > >    pn_messenger_free(messenger);
> > >
> > >    return 0;
> > > }
> > >
> > >
> >
> >
> 

Re: problem with multiple senders

Posted by Rafael Schloming <rh...@alum.mit.edu>.
I think this is the same bug we've seen before with passing fixed
(positive) credit limits to recv. The implementation isn't smart enough to
pay attention to who actually is offering messages when it allocates
credit, and so it ends up giving out all of its credit to a sender that has
no use for it instead of to the senders that are blocked. I suspect if you
replace your 3 with -1 in your call to pn_messenger_recv, then you will see
the hang go away.

--Rafael


On Thu, Apr 4, 2013 at 3:06 PM, Michael Goulish <mg...@redhat.com> wrote:

> OK, I'm looking at trace from receiver, and I thought
> I would post it here so I can't be accused of hogging
> all the fun for myself.
>
> ( Remember, three senders all send to same receiver address,
>   only two get 'accepted' replies.  Last sender ends up hanging in send(),
>   while receiver (in infinite loop) blocks on recv(). )
>
> I have marked the lines of application output with "APPLICATION OUTPUT:"
>
>
> Note:
>
> I see these 3 lines:
>   Accepted from localhost:42468
>   Accepted from localhost:42469
>   Accepted from localhost:42470
>
> But only two get closed:
>   Closed localhost:42468
>   Closed localhost:42469
>
>
>
>
> ----------------- begin trace -----------------------
> Listening on 0.0.0.0:6666
> APPLICATION OUTPUT:   receiving...
> Accepted from localhost:42468
> Accepted from localhost:42469
>     <- SASL
> [0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x25013c0:0] -> SASL-OUTCOME @68 [0]
>     -> SASL
>     -> AMQP
> [0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> null, null, null, null, null, null, null]
> Accepted from localhost:42470
>     <- SASL
> [0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x253f490:0] -> SASL-OUTCOME @68 [0]
>     -> SASL
>     -> AMQP
> [0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> null, null, null, null, null, null, null]
>     <- AMQP
> [0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3",
> "0.0.0.0", null, null, null, null, null, null, null]
> [0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> 0, null, 0, false, null, null], null, null, 0]
> [0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> null, null, null, 0]
> [0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
>     <- SASL
> [0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x2563350:0] -> SASL-OUTCOME @68 [0]
>     -> SASL
>     -> AMQP
> [0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> null, null, null, null, null, null, null]
>     <- AMQP
> [0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf",
> "0.0.0.0", null, null, null, null, null, null, null]
> [0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> 0, null, 0, false, null, null], null, null, 0]
> [0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> null, null, null, 0]
> [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
>     <- AMQP
> [0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb",
> "0.0.0.0", null, null, null, null, null, null, null]
> [0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> 0, null, 0, false, null, null], null, null, 0]
> [0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> null, null, null, 0]
> [0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> [0x24fae10:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> @\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@
> @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xa1\x12Message from 22470"
> APPLICATION OUTPUT:   getting message...
> APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> APPLICATION OUTPUT:   Content: "Message from 22470"
> APPLICATION OUTPUT:   receiving...
> [0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
> [0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> [0x2538e40:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> @\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@
> @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xa1\x12Message from 22469"
> getting message...
> APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> APPLICATION OUTPUT:   Content: "Message from 22469
> APPLICATION OUTPUT:   receiving...
> [0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
> [0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> [0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> [0x24fae10:1] <- DETACH @22 [1, true, null]
> [0x24fae10:0] <- CLOSE @24 [null]
> [0x24fae10:0] <- EOS
> [0x24fae10:1] -> DETACH @22 [1, true, null]
> [0x24fae10:0] -> CLOSE @24 [null]
> [0x24fae10:0] -> EOS
> Closed localhost:42468
> [0x2538e40:1] <- DETACH @22 [1, true, null]
> [0x2538e40:0] <- CLOSE @24 [null]
> [0x2538e40:0] <- EOS
> [0x2538e40:1] -> DETACH @22 [1, true, null]
> [0x2538e40:0] -> CLOSE @24 [null]
> [0x2538e40:0] -> EOS
> Closed localhost:42469
>
> ----------------- end trace -------------------------
>
>
>
>
>
>
>
>
>
>
> ----- Original Message -----
> Any clues from a trace of the receiver?
>
> $ PN_TRACE_FRM=1 ./receiver 6666
>
> -Ted
>
> On 04/04/2013 02:09 PM, Michael Goulish wrote:
> >
> >    Is this a bug, or am I  Doing  Something  Wrong ?
> >
> >
> >
> > Scenario
> > {
> >    My sender sends a single message, and hopes to see
> >    that the receiver has accepted it.
> >
> >    I launch 3 copies of the sender very close together--
> >    they all talk to the same address.
> >
> >    My receiver receives in a loop, accepts every message
> >    that it receives.
> > }
> >
> >
> >
> >
> > Result
> > {
> >    Sometimes my receiver gets 1 of the 3 messages.
> >    Usually it gets 2.
> >    It never gets all 3.
> >
> >    The 3rd sender hangs in pn_messenger_send().
> >
> >    While the 3rd sender is hanging in send(), the receiver
> >    is patiently waiting in recv().
> > }
> >
> >
> >
> >
> >
> >
> > Sender Code ############################################
> >
> > /*
> >    Launch 3 of these from a script like so:
> >    ./sender 6666 &
> >    ./sender 6666 &
> >    ./sender 6666 &
> > */
> >
> >
> > #include "proton/message.h"
> > #include "proton/messenger.h"
> >
> > #include <getopt.h>
> > #include <stdio.h>
> > #include <stdlib.h>
> > #include <string.h>
> > #include <ctype.h>
> >
> >
> > char *
> > status_2_str ( pn_status_t status )
> > {
> >    switch ( status )
> >    {
> >      case PN_STATUS_UNKNOWN:
> >        return "unknown";
> >        break;
> >
> >      case PN_STATUS_PENDING:
> >        return "pending";
> >        break;
> >
> >      case PN_STATUS_ACCEPTED:
> >        return "accepted";
> >        break;
> >
> >      case PN_STATUS_REJECTED:
> >        return "rejected";
> >        break;
> >
> >      default:
> >        return "bad value";
> >        break;
> >    }
> > }
> >
> >
> >
> > pid_t my_pid = 0;
> >
> >
> > void
> > check ( char * label, int result )
> > {
> >    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> > }
> >
> >
> >
> > int
> > main(int argc, char** argv)
> > {
> >    int c;
> >    char addr [ 1000 ];
> >    char msgtext [ 100 ];
> >    pn_message_t   * message;
> >    pn_messenger_t * messenger;
> >    pn_data_t      * body;
> >    pn_tracker_t     tracker;
> >    pn_status_t      status;
> >    int              result;
> >
> >    my_pid = getpid();
> >
> >    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
> >
> >
> >    message = pn_message ( );
> >    messenger = pn_messenger ( NULL );
> >    pn_messenger_start ( messenger ) ;
> >    pn_messenger_set_outgoing_window ( messenger, 1 );
> >
> >
> >    pn_message_set_address ( message, addr );
> >    body = pn_message_body ( message );
> >
> >
> >    sprintf ( msgtext, "Message from %d", getpid() );
> >    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
> >    pn_messenger_put ( messenger, message );
> >    tracker = pn_messenger_outgoing_tracker ( messenger );
> >    pn_messenger_send ( messenger );
> >
> >
> >    status = pn_messenger_status ( messenger, tracker );
> >    fprintf ( stderr, "status : %s\n", status_2_str(status) );
> >
> >
> >    pn_messenger_stop ( messenger );
> >    pn_messenger_free ( messenger );
> >    pn_message_free ( message );
> >
> >    return 0;
> > }
> >
> >
> >
> >
> > Receiver Code ########################################################
> >
> > /*
> >
> >    Launch like this:
> >    ./receiver 6666
> > */
> >
> > #include <stdio.h>
> > #include <stdlib.h>
> > #include <ctype.h>
> >
> > #include "proton/message.h"
> > #include "proton/messenger.h"
> >
> >
> >
> > #define BUFSIZE 1024
> >
> >
> >
> > int
> > main(int argc, char** argv)
> > {
> >    size_t bufsize = BUFSIZE;
> >    char buffer [ BUFSIZE ];
> >    char addr [ 1000 ];
> >    pn_message_t   * message;
> >    pn_messenger_t * messenger;
> >    pn_data_t      * body;
> >    pn_tracker_t     tracker;
> >
> >
> >    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
> >
> >    message = pn_message();
> >    messenger = pn_messenger ( NULL );
> >
> >    pn_messenger_start(messenger);
> >    pn_messenger_subscribe ( messenger, addr );
> >    pn_messenger_set_incoming_window ( messenger, 5 );
> >
> >    /*---------------------------------
> >      Receive and accept the message.
> >    ---------------------------------*/
> >    while ( 1 )
> >    {
> >      fprintf ( stderr, "receiving...\n" );
> >      pn_messenger_recv ( messenger, 3 );
> >
> >      while ( pn_messenger_incoming ( messenger ) > 0 )
> >      {
> >        fprintf ( stderr, "getting message...\n" );
> >        pn_messenger_get ( messenger, message );
> >        tracker = pn_messenger_incoming_tracker ( messenger );
> >        pn_messenger_accept ( messenger, tracker, 0 );
> >        body = pn_message_body ( message );
> >        pn_data_format ( body, buffer, & bufsize );
> >        fprintf ( stdout, "Address: %s\n", pn_message_get_address (
> message ) );
> >        fprintf ( stdout, "Content: %s\n", buffer);
> >      }
> >    }
> >
> >    pn_messenger_stop(messenger);
> >    pn_messenger_free(messenger);
> >
> >    return 0;
> > }
> >
> >
>
>

Re: problem with multiple senders

Posted by Ted Ross <tr...@redhat.com>.
Looks like all three credits were given to the first link.  Once one 
message arrived, its credit was given to the second link.  The second 
link then transferred a message but the credit was given back to the 
second link where there were no more messages to transfer.


On 04/04/2013 03:06 PM, Michael Goulish wrote:
> OK, I'm looking at trace from receiver, and I thought
> I would post it here so I can't be accused of hogging
> all the fun for myself.
>
> ( Remember, three senders all send to same receiver address,
>    only two get 'accepted' replies.  Last sender ends up hanging in send(),
>    while receiver (in infinite loop) blocks on recv(). )
>
> I have marked the lines of application output with "APPLICATION OUTPUT:"
>
>
> Note:
>
> I see these 3 lines:
>    Accepted from localhost:42468
>    Accepted from localhost:42469
>    Accepted from localhost:42470
>
> But only two get closed:
>    Closed localhost:42468
>    Closed localhost:42469
>
>
>
>
> ----------------- begin trace -----------------------
> Listening on 0.0.0.0:6666
> APPLICATION OUTPUT:   receiving...
> Accepted from localhost:42468
> Accepted from localhost:42469
>      <- SASL
> [0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x25013c0:0] -> SASL-OUTCOME @68 [0]
>      -> SASL
>      -> AMQP
> [0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, null, null, null, null, null, null]
> Accepted from localhost:42470
>      <- SASL
> [0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x253f490:0] -> SASL-OUTCOME @68 [0]
>      -> SASL
>      -> AMQP
> [0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, null, null, null, null, null, null]
>      <- AMQP
> [0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3", "0.0.0.0", null, null, null, null, null, null, null]
> [0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, null], null, null, 0]
> [0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, null, null, 0]
> [0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
>      <- SASL
> [0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x2563350:0] -> SASL-OUTCOME @68 [0]
>      -> SASL
>      -> AMQP
> [0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, null, null, null, null, null, null]
>      <- AMQP
> [0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf", "0.0.0.0", null, null, null, null, null, null, null]
> [0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, null], null, null, 0]
> [0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, null, null, 0]
> [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
>      <- AMQP
> [0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb", "0.0.0.0", null, null, null, null, null, null, null]
> [0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, null], null, null, 0]
> [0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, null, null, 0]
> [0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> [0x24fae10:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00", 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666@\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00Sw\xa1\x12Message from 22470"
> APPLICATION OUTPUT:   getting message...
> APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> APPLICATION OUTPUT:   Content: "Message from 22470"
> APPLICATION OUTPUT:   receiving...
> [0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
> [0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> [0x2538e40:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00", 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666@\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00Sw\xa1\x12Message from 22469"
> getting message...
> APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> APPLICATION OUTPUT:   Content: "Message from 22469
> APPLICATION OUTPUT:   receiving...
> [0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
> [0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> [0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> [0x24fae10:1] <- DETACH @22 [1, true, null]
> [0x24fae10:0] <- CLOSE @24 [null]
> [0x24fae10:0] <- EOS
> [0x24fae10:1] -> DETACH @22 [1, true, null]
> [0x24fae10:0] -> CLOSE @24 [null]
> [0x24fae10:0] -> EOS
> Closed localhost:42468
> [0x2538e40:1] <- DETACH @22 [1, true, null]
> [0x2538e40:0] <- CLOSE @24 [null]
> [0x2538e40:0] <- EOS
> [0x2538e40:1] -> DETACH @22 [1, true, null]
> [0x2538e40:0] -> CLOSE @24 [null]
> [0x2538e40:0] -> EOS
> Closed localhost:42469
>
> ----------------- end trace -------------------------
>
>
>
>
>
>
>
>
>
>
> ----- Original Message -----
> Any clues from a trace of the receiver?
>
> $ PN_TRACE_FRM=1 ./receiver 6666
>
> -Ted
>
> On 04/04/2013 02:09 PM, Michael Goulish wrote:
>>     Is this a bug, or am I  Doing  Something  Wrong ?
>>
>>
>>
>> Scenario
>> {
>>     My sender sends a single message, and hopes to see
>>     that the receiver has accepted it.
>>
>>     I launch 3 copies of the sender very close together--
>>     they all talk to the same address.
>>
>>     My receiver receives in a loop, accepts every message
>>     that it receives.
>> }
>>
>>
>>
>>
>> Result
>> {
>>     Sometimes my receiver gets 1 of the 3 messages.
>>     Usually it gets 2.
>>     It never gets all 3.
>>
>>     The 3rd sender hangs in pn_messenger_send().
>>
>>     While the 3rd sender is hanging in send(), the receiver
>>     is patiently waiting in recv().
>> }
>>
>>
>>
>>
>>
>>
>> Sender Code ############################################
>>
>> /*
>>     Launch 3 of these from a script like so:
>>     ./sender 6666 &
>>     ./sender 6666 &
>>     ./sender 6666 &
>> */
>>
>>
>> #include "proton/message.h"
>> #include "proton/messenger.h"
>>
>> #include <getopt.h>
>> #include <stdio.h>
>> #include <stdlib.h>
>> #include <string.h>
>> #include <ctype.h>
>>
>>
>> char *
>> status_2_str ( pn_status_t status )
>> {
>>     switch ( status )
>>     {
>>       case PN_STATUS_UNKNOWN:
>>         return "unknown";
>>         break;
>>
>>       case PN_STATUS_PENDING:
>>         return "pending";
>>         break;
>>
>>       case PN_STATUS_ACCEPTED:
>>         return "accepted";
>>         break;
>>
>>       case PN_STATUS_REJECTED:
>>         return "rejected";
>>         break;
>>
>>       default:
>>         return "bad value";
>>         break;
>>     }
>> }
>>
>>
>>
>> pid_t my_pid = 0;
>>
>>
>> void
>> check ( char * label, int result )
>> {
>>     fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
>> }
>>
>>
>>
>> int
>> main(int argc, char** argv)
>> {
>>     int c;
>>     char addr [ 1000 ];
>>     char msgtext [ 100 ];
>>     pn_message_t   * message;
>>     pn_messenger_t * messenger;
>>     pn_data_t      * body;
>>     pn_tracker_t     tracker;
>>     pn_status_t      status;
>>     int              result;
>>
>>     my_pid = getpid();
>>
>>     sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
>>
>>
>>     message = pn_message ( );
>>     messenger = pn_messenger ( NULL );
>>     pn_messenger_start ( messenger ) ;
>>     pn_messenger_set_outgoing_window ( messenger, 1 );
>>
>>
>>     pn_message_set_address ( message, addr );
>>     body = pn_message_body ( message );
>>
>>
>>     sprintf ( msgtext, "Message from %d", getpid() );
>>     pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
>>     pn_messenger_put ( messenger, message );
>>     tracker = pn_messenger_outgoing_tracker ( messenger );
>>     pn_messenger_send ( messenger );
>>
>>
>>     status = pn_messenger_status ( messenger, tracker );
>>     fprintf ( stderr, "status : %s\n", status_2_str(status) );
>>
>>
>>     pn_messenger_stop ( messenger );
>>     pn_messenger_free ( messenger );
>>     pn_message_free ( message );
>>
>>     return 0;
>> }
>>
>>
>>
>>
>> Receiver Code ########################################################
>>
>> /*
>>
>>     Launch like this:
>>     ./receiver 6666
>> */
>>
>> #include <stdio.h>
>> #include <stdlib.h>
>> #include <ctype.h>
>>
>> #include "proton/message.h"
>> #include "proton/messenger.h"
>>
>>
>>
>> #define BUFSIZE 1024
>>
>>
>>
>> int
>> main(int argc, char** argv)
>> {
>>     size_t bufsize = BUFSIZE;
>>     char buffer [ BUFSIZE ];
>>     char addr [ 1000 ];
>>     pn_message_t   * message;
>>     pn_messenger_t * messenger;
>>     pn_data_t      * body;
>>     pn_tracker_t     tracker;
>>
>>
>>     sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
>>
>>     message = pn_message();
>>     messenger = pn_messenger ( NULL );
>>
>>     pn_messenger_start(messenger);
>>     pn_messenger_subscribe ( messenger, addr );
>>     pn_messenger_set_incoming_window ( messenger, 5 );
>>
>>     /*---------------------------------
>>       Receive and accept the message.
>>     ---------------------------------*/
>>     while ( 1 )
>>     {
>>       fprintf ( stderr, "receiving...\n" );
>>       pn_messenger_recv ( messenger, 3 );
>>
>>       while ( pn_messenger_incoming ( messenger ) > 0 )
>>       {
>>         fprintf ( stderr, "getting message...\n" );
>>         pn_messenger_get ( messenger, message );
>>         tracker = pn_messenger_incoming_tracker ( messenger );
>>         pn_messenger_accept ( messenger, tracker, 0 );
>>         body = pn_message_body ( message );
>>         pn_data_format ( body, buffer, & bufsize );
>>         fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) );
>>         fprintf ( stdout, "Content: %s\n", buffer);
>>       }
>>     }
>>
>>     pn_messenger_stop(messenger);
>>     pn_messenger_free(messenger);
>>
>>     return 0;
>> }
>>
>>


Re: problem with multiple senders

Posted by Michael Goulish <mg...@redhat.com>.
OK, I'm looking at trace from receiver, and I thought
I would post it here so I can't be accused of hogging 
all the fun for myself.

( Remember, three senders all send to same receiver address,
  only two get 'accepted' replies.  Last sender ends up hanging in send(),
  while receiver (in infinite loop) blocks on recv(). )

I have marked the lines of application output with "APPLICATION OUTPUT:"


Note:

I see these 3 lines:
  Accepted from localhost:42468
  Accepted from localhost:42469
  Accepted from localhost:42470

But only two get closed:
  Closed localhost:42468
  Closed localhost:42469




----------------- begin trace -----------------------
Listening on 0.0.0.0:6666
APPLICATION OUTPUT:   receiving...
Accepted from localhost:42468
Accepted from localhost:42469
    <- SASL
[0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
[0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
[0x25013c0:0] -> SASL-OUTCOME @68 [0]
    -> SASL
    -> AMQP
[0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, null, null, null, null, null, null]
Accepted from localhost:42470
    <- SASL
[0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
[0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
[0x253f490:0] -> SASL-OUTCOME @68 [0]
    -> SASL
    -> AMQP
[0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, null, null, null, null, null, null]
    <- AMQP
[0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3", "0.0.0.0", null, null, null, null, null, null, null]
[0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
[0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, null], null, null, 0]
[0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
[0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, null, null, 0]
[0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
    <- SASL
[0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
[0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
[0x2563350:0] -> SASL-OUTCOME @68 [0]
    -> SASL
    -> AMQP
[0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, null, null, null, null, null, null]
    <- AMQP
[0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf", "0.0.0.0", null, null, null, null, null, null, null]
[0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
[0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, null], null, null, 0]
[0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
[0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, null, null, 0]
[0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
    <- AMQP
[0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb", "0.0.0.0", null, null, null, null, null, null, null]
[0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
[0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, null], null, null, 0]
[0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
[0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, null, null, 0]
[0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
[0x24fae10:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00", 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666@\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00Sw\xa1\x12Message from 22470"
APPLICATION OUTPUT:   getting message...
APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
APPLICATION OUTPUT:   Content: "Message from 22470"
APPLICATION OUTPUT:   receiving...
[0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
[0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
[0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
[0x2538e40:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00", 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666@\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00Sw\xa1\x12Message from 22469"
getting message...
APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
APPLICATION OUTPUT:   Content: "Message from 22469
APPLICATION OUTPUT:   receiving...
[0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
[0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
[0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
[0x24fae10:1] <- DETACH @22 [1, true, null]
[0x24fae10:0] <- CLOSE @24 [null]
[0x24fae10:0] <- EOS
[0x24fae10:1] -> DETACH @22 [1, true, null]
[0x24fae10:0] -> CLOSE @24 [null]
[0x24fae10:0] -> EOS
Closed localhost:42468
[0x2538e40:1] <- DETACH @22 [1, true, null]
[0x2538e40:0] <- CLOSE @24 [null]
[0x2538e40:0] <- EOS
[0x2538e40:1] -> DETACH @22 [1, true, null]
[0x2538e40:0] -> CLOSE @24 [null]
[0x2538e40:0] -> EOS
Closed localhost:42469

----------------- end trace -------------------------










----- Original Message -----
Any clues from a trace of the receiver?

$ PN_TRACE_FRM=1 ./receiver 6666

-Ted

On 04/04/2013 02:09 PM, Michael Goulish wrote:
>
>    Is this a bug, or am I  Doing  Something  Wrong ?
>
>
>
> Scenario
> {
>    My sender sends a single message, and hopes to see
>    that the receiver has accepted it.
>
>    I launch 3 copies of the sender very close together--
>    they all talk to the same address.
>
>    My receiver receives in a loop, accepts every message
>    that it receives.
> }
>
>
>
>
> Result
> {
>    Sometimes my receiver gets 1 of the 3 messages.
>    Usually it gets 2.
>    It never gets all 3.
>
>    The 3rd sender hangs in pn_messenger_send().
>
>    While the 3rd sender is hanging in send(), the receiver
>    is patiently waiting in recv().
> }
>
>
>
>
>
>
> Sender Code ############################################
>
> /*
>    Launch 3 of these from a script like so:
>    ./sender 6666 &
>    ./sender 6666 &
>    ./sender 6666 &
> */
>
>
> #include "proton/message.h"
> #include "proton/messenger.h"
>
> #include <getopt.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <string.h>
> #include <ctype.h>
>
>
> char *
> status_2_str ( pn_status_t status )
> {
>    switch ( status )
>    {
>      case PN_STATUS_UNKNOWN:
>        return "unknown";
>        break;
>
>      case PN_STATUS_PENDING:
>        return "pending";
>        break;
>
>      case PN_STATUS_ACCEPTED:
>        return "accepted";
>        break;
>
>      case PN_STATUS_REJECTED:
>        return "rejected";
>        break;
>
>      default:
>        return "bad value";
>        break;
>    }
> }
>
>
>
> pid_t my_pid = 0;
>
>
> void
> check ( char * label, int result )
> {
>    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> }
>
>
>
> int
> main(int argc, char** argv)
> {
>    int c;
>    char addr [ 1000 ];
>    char msgtext [ 100 ];
>    pn_message_t   * message;
>    pn_messenger_t * messenger;
>    pn_data_t      * body;
>    pn_tracker_t     tracker;
>    pn_status_t      status;
>    int              result;
>
>    my_pid = getpid();
>
>    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
>
>
>    message = pn_message ( );
>    messenger = pn_messenger ( NULL );
>    pn_messenger_start ( messenger ) ;
>    pn_messenger_set_outgoing_window ( messenger, 1 );
>
>
>    pn_message_set_address ( message, addr );
>    body = pn_message_body ( message );
>
>
>    sprintf ( msgtext, "Message from %d", getpid() );
>    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
>    pn_messenger_put ( messenger, message );
>    tracker = pn_messenger_outgoing_tracker ( messenger );
>    pn_messenger_send ( messenger );
>
>
>    status = pn_messenger_status ( messenger, tracker );
>    fprintf ( stderr, "status : %s\n", status_2_str(status) );
>
>
>    pn_messenger_stop ( messenger );
>    pn_messenger_free ( messenger );
>    pn_message_free ( message );
>
>    return 0;
> }
>
>
>
>
> Receiver Code ########################################################
>
> /*
>
>    Launch like this:
>    ./receiver 6666
> */
>
> #include <stdio.h>
> #include <stdlib.h>
> #include <ctype.h>
>
> #include "proton/message.h"
> #include "proton/messenger.h"
>
>
>
> #define BUFSIZE 1024
>
>
>
> int
> main(int argc, char** argv)
> {
>    size_t bufsize = BUFSIZE;
>    char buffer [ BUFSIZE ];
>    char addr [ 1000 ];
>    pn_message_t   * message;
>    pn_messenger_t * messenger;
>    pn_data_t      * body;
>    pn_tracker_t     tracker;
>
>
>    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
>
>    message = pn_message();
>    messenger = pn_messenger ( NULL );
>
>    pn_messenger_start(messenger);
>    pn_messenger_subscribe ( messenger, addr );
>    pn_messenger_set_incoming_window ( messenger, 5 );
>
>    /*---------------------------------
>      Receive and accept the message.
>    ---------------------------------*/
>    while ( 1 )
>    {
>      fprintf ( stderr, "receiving...\n" );
>      pn_messenger_recv ( messenger, 3 );
>
>      while ( pn_messenger_incoming ( messenger ) > 0 )
>      {
>        fprintf ( stderr, "getting message...\n" );
>        pn_messenger_get ( messenger, message );
>        tracker = pn_messenger_incoming_tracker ( messenger );
>        pn_messenger_accept ( messenger, tracker, 0 );
>        body = pn_message_body ( message );
>        pn_data_format ( body, buffer, & bufsize );
>        fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) );
>        fprintf ( stdout, "Content: %s\n", buffer);
>      }
>    }
>
>    pn_messenger_stop(messenger);
>    pn_messenger_free(messenger);
>
>    return 0;
> }
>
>


Re: problem with multiple senders

Posted by Ted Ross <tr...@redhat.com>.
Any clues from a trace of the receiver?

$ PN_TRACE_FRM=1 ./receiver 6666

-Ted

On 04/04/2013 02:09 PM, Michael Goulish wrote:
>
>    Is this a bug, or am I  Doing  Something  Wrong ?
>
>
>
> Scenario
> {
>    My sender sends a single message, and hopes to see
>    that the receiver has accepted it.
>
>    I launch 3 copies of the sender very close together--
>    they all talk to the same address.
>
>    My receiver receives in a loop, accepts every message
>    that it receives.
> }
>
>
>
>
> Result
> {
>    Sometimes my receiver gets 1 of the 3 messages.
>    Usually it gets 2.
>    It never gets all 3.
>
>    The 3rd sender hangs in pn_messenger_send().
>
>    While the 3rd sender is hanging in send(), the receiver
>    is patiently waiting in recv().
> }
>
>
>
>
>
>
> Sender Code ############################################
>
> /*
>    Launch 3 of these from a script like so:
>    ./sender 6666 &
>    ./sender 6666 &
>    ./sender 6666 &
> */
>
>
> #include "proton/message.h"
> #include "proton/messenger.h"
>
> #include <getopt.h>
> #include <stdio.h>
> #include <stdlib.h>
> #include <string.h>
> #include <ctype.h>
>
>
> char *
> status_2_str ( pn_status_t status )
> {
>    switch ( status )
>    {
>      case PN_STATUS_UNKNOWN:
>        return "unknown";
>        break;
>
>      case PN_STATUS_PENDING:
>        return "pending";
>        break;
>
>      case PN_STATUS_ACCEPTED:
>        return "accepted";
>        break;
>
>      case PN_STATUS_REJECTED:
>        return "rejected";
>        break;
>
>      default:
>        return "bad value";
>        break;
>    }
> }
>
>
>
> pid_t my_pid = 0;
>
>
> void
> check ( char * label, int result )
> {
>    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> }
>
>
>
> int
> main(int argc, char** argv)
> {
>    int c;
>    char addr [ 1000 ];
>    char msgtext [ 100 ];
>    pn_message_t   * message;
>    pn_messenger_t * messenger;
>    pn_data_t      * body;
>    pn_tracker_t     tracker;
>    pn_status_t      status;
>    int              result;
>
>    my_pid = getpid();
>
>    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
>
>
>    message = pn_message ( );
>    messenger = pn_messenger ( NULL );
>    pn_messenger_start ( messenger ) ;
>    pn_messenger_set_outgoing_window ( messenger, 1 );
>
>
>    pn_message_set_address ( message, addr );
>    body = pn_message_body ( message );
>
>
>    sprintf ( msgtext, "Message from %d", getpid() );
>    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
>    pn_messenger_put ( messenger, message );
>    tracker = pn_messenger_outgoing_tracker ( messenger );
>    pn_messenger_send ( messenger );
>
>
>    status = pn_messenger_status ( messenger, tracker );
>    fprintf ( stderr, "status : %s\n", status_2_str(status) );
>
>
>    pn_messenger_stop ( messenger );
>    pn_messenger_free ( messenger );
>    pn_message_free ( message );
>
>    return 0;
> }
>
>
>
>
> Receiver Code ########################################################
>
> /*
>
>    Launch like this:
>    ./receiver 6666
> */
>
> #include <stdio.h>
> #include <stdlib.h>
> #include <ctype.h>
>
> #include "proton/message.h"
> #include "proton/messenger.h"
>
>
>
> #define BUFSIZE 1024
>
>
>
> int
> main(int argc, char** argv)
> {
>    size_t bufsize = BUFSIZE;
>    char buffer [ BUFSIZE ];
>    char addr [ 1000 ];
>    pn_message_t   * message;
>    pn_messenger_t * messenger;
>    pn_data_t      * body;
>    pn_tracker_t     tracker;
>
>
>    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
>
>    message = pn_message();
>    messenger = pn_messenger ( NULL );
>
>    pn_messenger_start(messenger);
>    pn_messenger_subscribe ( messenger, addr );
>    pn_messenger_set_incoming_window ( messenger, 5 );
>
>    /*---------------------------------
>      Receive and accept the message.
>    ---------------------------------*/
>    while ( 1 )
>    {
>      fprintf ( stderr, "receiving...\n" );
>      pn_messenger_recv ( messenger, 3 );
>
>      while ( pn_messenger_incoming ( messenger ) > 0 )
>      {
>        fprintf ( stderr, "getting message...\n" );
>        pn_messenger_get ( messenger, message );
>        tracker = pn_messenger_incoming_tracker ( messenger );
>        pn_messenger_accept ( messenger, tracker, 0 );
>        body = pn_message_body ( message );
>        pn_data_format ( body, buffer, & bufsize );
>        fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) );
>        fprintf ( stdout, "Content: %s\n", buffer);
>      }
>    }
>
>    pn_messenger_stop(messenger);
>    pn_messenger_free(messenger);
>
>    return 0;
> }
>
>