You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2014/12/08 18:02:10 UTC

qpid-proton git commit: initial commit of psend/precv examples.

Repository: qpid-proton
Updated Branches:
  refs/heads/master 4d2a6fd01 -> cefbf9839


initial commit of psend/precv examples.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cefbf983
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cefbf983
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cefbf983

Branch: refs/heads/master
Commit: cefbf9839e9c9f966f3b425da16ad26fc31b45fc
Parents: 4d2a6fd
Author: mick <mi...@redhat.com>
Authored: Mon Dec 8 12:00:53 2014 -0500
Committer: mick <mi...@redhat.com>
Committed: Mon Dec 8 12:00:53 2014 -0500

----------------------------------------------------------------------
 examples/engine/c/precv.c | 502 +++++++++++++++++++++++++++++++++++++++++
 examples/engine/c/psend.c | 373 ++++++++++++++++++++++++++++++
 2 files changed, 875 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cefbf983/examples/engine/c/precv.c
----------------------------------------------------------------------
diff --git a/examples/engine/c/precv.c b/examples/engine/c/precv.c
new file mode 100644
index 0000000..3c79a6e
--- /dev/null
+++ b/examples/engine/c/precv.c
@@ -0,0 +1,502 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/*################################################################
+  This program is half of a pair.  Precv and Psend are meant to 
+  be simple-as-possible examples of how to use the proton-c
+  engine interface to send and receive messages over a single 
+  connection and a single session.
+
+  In addition to being examples, these programs or their 
+  descendants will be used in performance regression testing
+  for both throughput and latency, and long-term soak testing.
+*################################################################*/
+
+#include <stdio.h>
+#include <string.h>
+#include <ctype.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <time.h>
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/driver.h>
+#include <proton/event.h>
+#include <proton/terminus.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+
+
+
+
+
+#define MY_BUF_SIZE  1000
+
+
+
+
+/*---------------------------------------------------------
+  These high-resolution times are used both for
+  interim timing reports -- i.e. every 'report_frequency'
+  messages -- and for the final timestamp, after all 
+  expected messages have been received.
+---------------------------------------------------------*/
+static
+double
+get_time ( )
+{
+  struct timeval tv;
+  struct tm      * timeinfo;
+
+  gettimeofday ( & tv, 0 );
+  timeinfo = localtime ( & tv.tv_sec );
+
+  double time_now = 3600 * timeinfo->tm_hour +
+                      60 * timeinfo->tm_min  +
+                           timeinfo->tm_sec;
+
+  time_now += ((double)(tv.tv_usec) / 1000000.0);
+  return time_now;
+}
+
+
+
+
+
+/*----------------------------------------------------
+  These absolute timestamps are useful in soak tests,
+  where I want to align the program's output with 
+  output from top to look at CPU and memory use..
+----------------------------------------------------*/
+void
+print_timestamp_like_a_normal_person ( FILE * fp )
+{
+  char const * month_abbrevs[] = { "jan", 
+                                   "feb", 
+                                   "mar", 
+                                   "apr", 
+                                   "may", 
+                                   "jun", 
+                                   "jul", 
+                                   "aug", 
+                                   "sep", 
+                                   "oct", 
+                                   "nov", 
+                                   "dec" 
+                                 };
+  time_t rawtime;
+  struct tm * timeinfo;
+
+  time ( & rawtime );
+  timeinfo = localtime ( &rawtime );
+
+  char time_string[100];
+  sprintf ( time_string,
+            "%d-%s-%02d %02d:%02d:%02d",
+            1900 + timeinfo->tm_year,
+            month_abbrevs[timeinfo->tm_mon],
+            timeinfo->tm_mday,
+            timeinfo->tm_hour,
+            timeinfo->tm_min,
+            timeinfo->tm_sec
+          );
+
+  fprintf ( fp, "timestamp %s\n", time_string );
+}
+
+
+
+
+
+int 
+main ( int argc, char ** argv  ) 
+{
+  char info[1000];
+
+  uint64_t received = 0;
+
+  char host [1000];
+  char port [1000];
+  char output_file_name[1000];
+
+  int initial_flow   = 400;
+  int flow_increment = 200;
+
+  int       report_frequency  = 200000;
+  int64_t   messages          = 2000000,
+            delivery_count    = 0;
+
+  strcpy ( host, "0.0.0.0" );
+  strcpy ( port, "5672" );
+
+  pn_driver_t     * driver;
+  pn_listener_t   * listener;
+  pn_connector_t  * connector;
+  pn_connection_t * connection;
+  pn_collector_t  * collector;
+  pn_transport_t  * transport;
+  pn_sasl_t       * sasl;
+  pn_session_t    * session;
+  pn_event_t      * event;
+  pn_link_t       * link;
+  pn_delivery_t   * delivery;
+
+
+  double last_time,
+         this_time,
+         time_diff;
+
+  char * message_data          = (char *) malloc ( MY_BUF_SIZE );
+  int    message_data_capacity = MY_BUF_SIZE;
+
+  FILE * output_fp;
+
+
+  /*-----------------------------------------------------------
+    Read the command lines args and override initialization.
+  -----------------------------------------------------------*/
+  for ( int i = 1; i < argc; ++ i )
+  {
+    if ( ! strcmp ( "--host", argv[i] ) )
+    {
+      strcpy ( host, argv[i+1] );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--port", argv[i] ) )
+    {
+      strcpy ( port, argv[i+1] );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--messages", argv[i] ) )
+    {
+      sscanf ( argv [ i+1 ], "%" SCNd64 , & messages );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--output", argv[i] ) )
+    {
+      if ( ! strcmp ( "stderr", argv[i+1] ) )
+      {
+        output_fp = stderr;
+        strcpy ( output_file_name, "stderr");
+      }
+      else
+      if ( ! strcmp ( "stdout", argv[i+1] ) )
+      {
+        output_fp = stdout;
+        strcpy ( output_file_name, "stdout");
+      }
+      else
+      {
+        output_fp = fopen ( argv[i+1], "w" );
+        strcpy ( output_file_name, argv[i+1] );
+        if ( ! output_fp )
+        {
+          fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] );
+          exit ( 1 );
+        }
+      }
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--report_frequency", argv[i] ) )
+    {
+      report_frequency = atoi ( argv[i+1] );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--initial_flow", argv[i] ) )
+    {
+      sscanf ( argv [ i+1 ], "%d", & initial_flow );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--flow_increment", argv[i] ) )
+    {
+      sscanf ( argv [ i+1 ], "%d", & flow_increment );
+      ++ i;
+    }
+    else
+    {
+      fprintf ( output_fp, "unknown arg |%s|\n", argv[i] );
+      exit ( 1 );
+    }
+  }
+
+  /*-----------------------------------------------
+    Show what we ended up with for all the args.
+  -----------------------------------------------*/
+  fprintf ( output_fp, "host                %s\n",          host );
+  fprintf ( output_fp, "port                %s\n",          port );
+  fprintf ( output_fp, "messages            %" PRId64 "\n", messages );
+  fprintf ( output_fp, "report_frequency    %d\n",          report_frequency );
+  fprintf ( output_fp, "initial_flow        %d\n",          initial_flow );
+  fprintf ( output_fp, "flow_increment      %d\n",          flow_increment );
+  fprintf ( output_fp, "output              %s\n",          output_file_name );
+
+
+  /*--------------------------------------------
+    Create a standard driver and listen for the 
+    initial connector.
+  --------------------------------------------*/
+  driver = pn_driver ( );
+
+  if ( ! pn_listener(driver, host, port, 0) ) 
+  {
+    fprintf ( output_fp, "precv listener creation failed.\n" );
+    exit ( 1 );
+  }
+
+  fprintf ( output_fp, "\nprecv ready...\n\n" );
+
+  while ( 1 )
+  {
+    pn_driver_wait ( driver, -1 );
+    if ( listener = pn_driver_listener(driver) )
+    {
+      if ( connector = pn_listener_accept(listener) )
+        break;
+    }
+  }
+
+  /*--------------------------------------------------------
+    Now make all the other structure around the connector,
+    and tell it that skipping sasl is OK.
+  --------------------------------------------------------*/
+  connection = pn_connection ( );
+  collector  = pn_collector  ( );
+  pn_connection_collect ( connection, collector );
+  pn_connector_set_connection ( connector, connection );
+
+  transport = pn_connector_transport ( connector );
+  sasl = pn_sasl ( transport );
+  pn_sasl_mechanisms ( sasl, "ANONYMOUS" );
+  pn_sasl_server ( sasl );
+  pn_sasl_allow_skip ( sasl, true );
+  pn_sasl_done ( sasl, PN_SASL_OK );
+
+  /*----------------------------------------------------------
+    If report_frequency is not set to zero, we will 
+    produce a timing report every report_frequency messages.
+    The timing reported will be the delta from the last_time
+    to the current time.  
+    This is useful in soak testing, where you basically 
+    never stop, but still need to see how the system is doing
+    every so often.
+  ----------------------------------------------------------*/
+  last_time = get_time();
+
+
+  /*------------------------------------------------------------
+    A triply-nested loop.
+    In the outermost one, we just wait for activity to come in 
+    from the driver.
+  ------------------------------------------------------------*/
+  while ( 1 )
+  {
+    pn_driver_wait ( driver, -1 );
+
+    /*---------------------------------------------------------------
+      In the next loop, we keep going as long as we processed
+      some events.  This is because our own processing of events
+      may have caused more to be generated that we need to handle.
+      If we go back to the outermost loop and its pn_driver_wait()
+      without handling these events, we will end up with the sender
+      and receiver programs just staring at each other with blank
+      expressions on their faces.
+    ---------------------------------------------------------------*/
+    int event_count = 1;
+    while ( event_count > 0 )
+    {
+      event_count = 0;
+      pn_connector_process ( connector );
+
+      /*-------------------------------------------------------
+        After we process the connector, it may have generated
+        a batch of events for us to handle.  As we go through 
+        this batch of events, our handling may generate other 
+        events which we must handle before going back to 
+        pn_driver_wait().
+      --------------------------------------------------------*/
+      event = pn_collector_peek(collector);
+      while ( event )
+      {
+        ++ event_count;
+        pn_event_type_t event_type = pn_event_type ( event );
+        //fprintf ( output_fp, "precv event: %s\n", pn_event_type_name(event_type));
+
+
+        switch ( event_type )
+        {
+          case PN_CONNECTION_REMOTE_OPEN:
+          break;
+
+
+          case PN_SESSION_REMOTE_OPEN:
+            session = pn_event_session(event);
+            if ( pn_session_state(session) & PN_LOCAL_UNINIT ) 
+            {
+              // big number because it's in bytes.
+              pn_session_set_incoming_capacity ( session, 1000000 );
+              pn_session_open ( session );
+            }
+          break;
+
+
+          case PN_LINK_REMOTE_OPEN:
+            /*----------------------------------------------------
+              When we first open the link, we give it an initial 
+              amount of credit in units of messages.
+              We will later increment its credit whenever credit
+              falls below some threshold.
+            ----------------------------------------------------*/
+            link = pn_event_link(event);
+            if (pn_link_state(link) & PN_LOCAL_UNINIT )
+            {
+              pn_link_open ( link );
+              pn_link_flow ( link, initial_flow );
+            }
+          break;
+
+
+          case PN_CONNECTION_BOUND:
+            if ( pn_connection_state(connection) & PN_LOCAL_UNINIT )
+            {
+              pn_connection_open ( connection );
+            }
+          break;
+
+
+          // And now the event that you've all been waiting for.....
+
+          case PN_DELIVERY:
+            link = pn_event_link ( event );
+            delivery = pn_event_delivery ( event );
+
+            /*------------------------------------------------
+              Since I want this program to be a receiver,
+              I am not interested in deliver-related events 
+              unless they are incoming, 'readable' events.
+            ------------------------------------------------*/
+            if ( pn_delivery_readable ( delivery ) )
+            {
+              // If the delivery is partial I am just going to ignore
+              // it until it becomes complete.
+              if ( ! pn_delivery_partial ( delivery ) )
+              {
+                ++ delivery_count; 
+                /*
+                if ( ! (delivery_count % report_frequency) )
+                {
+                  pn_link_t * delivery_link = pn_delivery_link ( delivery );
+                  int received_bytes = pn_delivery_pending ( delivery );
+                  pn_link_recv ( delivery_link, incoming, 1000 );
+                  fprintf ( output_fp, "received bytes: %d\n", received_bytes );
+                }
+                */
+
+                // don't bother updating.  they're pre-settled.
+                // pn_delivery_update ( delivery, PN_ACCEPTED );
+                pn_delivery_settle ( delivery );
+
+                int credit = pn_link_credit ( link );
+
+                if ( delivery_count >= messages )
+                {
+                  fprintf ( output_fp, "precv_stop %.3lf\n", get_time() );
+                  goto all_done;
+                }
+
+                // Make a report frequency of zero shut down interim reporting.
+                if ( report_frequency
+                     &&
+                     (! (delivery_count % report_frequency))
+                   )
+                {
+                  this_time = get_time();
+                  time_diff = this_time - last_time;
+
+                  print_timestamp_like_a_normal_person ( output_fp );
+                  fprintf ( output_fp, 
+                            "deliveries: %" PRIu64 "  time: %.3lf\n", 
+                            delivery_count,
+                            time_diff
+                          );
+                  fflush ( output_fp );
+                  last_time = this_time;
+                }
+
+                /*----------------------------------------------------------
+                  I replenish the credit whevere it falls below this limit
+                  because I this psend / precv pair of programs to run as
+                  fast as possible.  A real application might want to do
+                  something fancier here.
+                ----------------------------------------------------------*/
+                if ( credit <= flow_increment )
+                {
+                  pn_link_flow ( link, flow_increment );
+                }
+              }
+            }
+          break;
+
+
+          case PN_TRANSPORT:
+            // not sure why I would care...
+          break;
+
+
+          default:
+            /*
+            fprintf ( output_fp, 
+                      "precv unhandled event: %s\n", 
+                      pn_event_type_name(event_type)
+                    );
+            */
+          break;
+        }
+
+        pn_collector_pop ( collector );
+        event = pn_collector_peek(collector);
+      }
+    }
+  }
+
+
+all_done:
+  pn_session_close ( session );
+  pn_connection_close ( connection );
+  fclose ( output_fp );
+  return 0;
+}
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cefbf983/examples/engine/c/psend.c
----------------------------------------------------------------------
diff --git a/examples/engine/c/psend.c b/examples/engine/c/psend.c
new file mode 100644
index 0000000..2ad0edb
--- /dev/null
+++ b/examples/engine/c/psend.c
@@ -0,0 +1,373 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/*################################################################
+  This program is half of a pair.  Precv and Psend are meant to 
+  be simple-as-possible examples of how to use the proton-c
+  engine interface to send and receive messages over a single 
+  connection and a single session.
+
+  In addition to being examples, these programs or their 
+  descendants will be used in performance regression testing
+  for both throughput and latency, and long-term soak testing.
+
+  This program, psend, is highly similar to its peer precv.
+  I put all the good comments in precv.
+*################################################################*/
+
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <ctype.h>
+#include <time.h>
+#include <sys/time.h>
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/driver.h>
+#include <proton/event.h>
+#include <proton/terminus.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+
+
+#define MY_BUF_SIZE 1000
+
+
+
+void
+print_timestamp ( FILE * fp, char const * label )
+{
+  struct timeval tv;
+  struct tm      * timeinfo;
+
+  gettimeofday ( & tv, 0 );
+  timeinfo = localtime ( & tv.tv_sec );
+
+  int seconds_today = 3600 * timeinfo->tm_hour +
+                        60 * timeinfo->tm_min  +
+                             timeinfo->tm_sec;
+
+  fprintf ( fp, "time : %d.%.6ld : %s\n", seconds_today, tv.tv_usec, label );
+}
+
+
+
+
+
+static 
+double
+get_time ( )
+{
+  struct timeval tv;
+  struct tm      * timeinfo;
+
+  gettimeofday ( & tv, 0 );
+  timeinfo = localtime ( & tv.tv_sec );
+
+  double time_now = 3600 * timeinfo->tm_hour +
+                      60 * timeinfo->tm_min  +
+                           timeinfo->tm_sec;
+
+  time_now += ((double)(tv.tv_usec) / 1000000.0);
+  return time_now;
+}
+
+
+
+
+
+int 
+main ( int argc, char ** argv )
+{
+  char addr [ 1000 ];
+  char host [ 1000 ];
+  char port [ 1000 ];
+  char output_file_name[1000];
+
+
+  uint64_t messages       = 2000000,
+           delivery_count = 0;
+
+  int message_length = 100;
+
+  bool done           = false;
+  int  sent_count     = 0;
+  int  n_links        = 5;
+  int const max_links = 100;
+
+  strcpy ( addr, "queue" );
+  strcpy ( host, "0.0.0.0" );
+  strcpy ( port, "5672" );
+
+  FILE * output_fp;
+
+
+  for ( int i = 1; i < argc; ++ i )
+  {
+    if ( ! strcmp ( "--host", argv[i] ) )
+    {
+      strcpy ( host, argv[i+1] );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--port", argv[i] ) )
+    {
+      strcpy ( port, argv[i+1] );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--messages", argv[i] ) )
+    {
+      sscanf ( argv [ i+1 ], "%" SCNd64 , & messages );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--message_length", argv[i] ) )
+    {
+      sscanf ( argv [ i+1 ], "%d", & message_length );
+      ++ i;
+    }
+    else
+    if ( ! strcmp ( "--n_links", argv[i] ) )
+    {
+      sscanf ( argv [ i+1 ], "%d", & n_links );
+      ++ i;
+    }
+    if ( ! strcmp ( "--output", argv[i] ) )
+    {
+      if ( ! strcmp ( "stderr", argv[i+1] ) )
+      {
+        output_fp = stderr;
+        strcpy ( output_file_name, "stderr");
+      }
+      else
+      if ( ! strcmp ( "stdout", argv[i+1] ) )
+      {
+        output_fp = stdout;
+        strcpy ( output_file_name, "stdout");
+      }
+      else
+      {
+        output_fp = fopen ( argv[i+1], "w" );
+        strcpy ( output_file_name, argv[i+1] );
+        if ( ! output_fp )
+        {
+          fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] );
+          exit ( 1 );
+        }
+      }
+      ++ i;
+    }
+    else
+    {
+      fprintf ( output_fp, "unknown arg %s", argv[i] );
+    }
+  }
+
+
+  fprintf ( output_fp, "host            %s\n",          host );
+  fprintf ( output_fp, "port            %s\n",          port );
+  fprintf ( output_fp, "messages        %" PRId64 "\n", messages );
+  fprintf ( output_fp, "message_length  %d\n",          message_length );
+  fprintf ( output_fp, "n_links         %d\n",          n_links );
+  fprintf ( output_fp, "output          %s\n",          output_file_name );
+
+
+  if ( n_links > max_links )
+  {
+    fprintf ( output_fp, "You can't have more than %d links.\n", max_links );
+    exit ( 1 );
+  }
+
+  pn_driver_t     * driver;
+  pn_connector_t  * connector;
+  pn_connector_t  * driver_connector;
+  pn_connection_t * connection;
+  pn_collector_t  * collector;
+  pn_link_t       * links [ max_links ];
+  pn_session_t    * session;
+  pn_event_t      * event;
+  pn_delivery_t   * delivery;
+
+
+  char * message = (char *) malloc(message_length);
+  memset ( message, 13, message_length );
+
+  /*----------------------------------------------------
+    Get everything set up.
+    We will have a single connector, a single 
+    connection, a single session, and a single link.
+  ----------------------------------------------------*/
+  driver = pn_driver ( );
+  connector = pn_connector ( driver, host, port, 0 );
+
+  connection = pn_connection();
+  collector  = pn_collector  ( );
+  pn_connection_collect ( connection, collector );
+  pn_connector_set_connection ( connector, connection );
+
+  session = pn_session ( connection );
+  pn_connection_open ( connection );
+  pn_session_open ( session );
+
+  for ( int i = 0; i < n_links; ++ i )
+  {
+    char name[100];
+    sprintf ( name, "tvc_15_%d", i );
+    links[i] = pn_sender ( session, name );
+    pn_terminus_set_address ( pn_link_target(links[i]), addr );
+    pn_link_open ( links[i] );
+  }
+
+  /*-----------------------------------------------------------
+    For my speed tests, I do not want to count setup time.
+    Start timing here.  The receiver will print out a similar
+    timestamp when he receives the final message.
+  -----------------------------------------------------------*/
+  fprintf ( output_fp, "psend: sending %llu messages.\n", messages );
+
+  // Just before we start sending, print the start timestamp.
+  fprintf ( output_fp, "psend_start %.3lf\n", get_time() );
+
+  while ( 1 )
+  {
+    pn_driver_wait ( driver, -1 );
+
+    int event_count = 1;
+    while ( event_count > 0 )
+    {
+      event_count = 0;
+      pn_connector_process ( connector );
+
+      event = pn_collector_peek(collector);
+      while ( event )
+      {
+        ++ event_count;
+        pn_event_type_t event_type = pn_event_type ( event );
+        //fprintf ( output_fp, "event: %s\n", pn_event_type_name ( event_type ) );
+
+        switch ( event_type )
+        {
+          case PN_LINK_FLOW:
+          {
+            if ( delivery_count < messages )
+            {
+              /*---------------------------------------------------
+                We may have opened multiple links.
+                The event will tell us which one this flow-event
+                happened on.  If the flow event gave us some 
+                credit, we will greedily send messages until it
+                is all used up.
+              ---------------------------------------------------*/
+              pn_link_t * link = pn_event_link ( event );
+              int credit = pn_link_credit ( link );
+
+              while ( credit > 0 )
+              {
+                // Every delivery we create needs a unique tag.
+                char str [ 100 ];
+                sprintf ( str, "%x", delivery_count ++ );
+                delivery = pn_delivery ( link, pn_dtag(str, strlen(str)) );
+
+                // If you settle the delivery before sending it,
+                // you will spend some time wondering why your 
+                // messages don't have any content when they arrive 
+                // at the receiver.
+                pn_link_send ( link, message, message_length );
+                pn_delivery_settle ( delivery );
+                pn_link_advance ( link );
+                credit = pn_link_credit ( link );
+              }
+              
+              if ( delivery_count >= messages )
+              {
+                fprintf ( output_fp, 
+                          "I have sent all %d messages.\n" ,
+                          delivery_count
+                        );
+                /*
+                  I'm still kind of hazy on how to shut down the
+                  psend / precv system properly ....
+                  I can't go to all_done here, or precv will never
+                  get all its messages and terminate.
+                  So I let precv terminate properly ... which means
+                  that this program, psend, dies with an error.
+                  Hmm.
+                */
+                // goto all_done;
+              }
+            }
+          }
+          break;
+
+
+          case PN_TRANSPORT:
+            // I don't know what this means here, either.
+          break;
+
+
+          case PN_TRANSPORT_TAIL_CLOSED:
+            goto all_done;
+          break;
+
+
+          default:
+            /*
+            fprintf ( output_fp,
+                      "precv unhandled event: %s\n",
+                      pn_event_type_name(event_type)
+                    );
+            */
+          break;
+
+        }
+
+        pn_collector_pop ( collector );
+        event = pn_collector_peek(collector);
+      }
+    }
+  }
+
+  all_done:
+
+  for ( int i = 0; i < n_links; ++ i )
+  {
+    pn_link_close ( links[i] );
+  }
+
+  pn_session_close ( session );
+  pn_connection_close ( connection );
+
+  return 0;
+}
+
+
+
+
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org