You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2014/12/08 18:02:10 UTC
qpid-proton git commit: initial commit of psend/precv examples.
Repository: qpid-proton
Updated Branches:
refs/heads/master 4d2a6fd01 -> cefbf9839
initial commit of psend/precv examples.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cefbf983
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cefbf983
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cefbf983
Branch: refs/heads/master
Commit: cefbf9839e9c9f966f3b425da16ad26fc31b45fc
Parents: 4d2a6fd
Author: mick <mi...@redhat.com>
Authored: Mon Dec 8 12:00:53 2014 -0500
Committer: mick <mi...@redhat.com>
Committed: Mon Dec 8 12:00:53 2014 -0500
----------------------------------------------------------------------
examples/engine/c/precv.c | 502 +++++++++++++++++++++++++++++++++++++++++
examples/engine/c/psend.c | 373 ++++++++++++++++++++++++++++++
2 files changed, 875 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cefbf983/examples/engine/c/precv.c
----------------------------------------------------------------------
diff --git a/examples/engine/c/precv.c b/examples/engine/c/precv.c
new file mode 100644
index 0000000..3c79a6e
--- /dev/null
+++ b/examples/engine/c/precv.c
@@ -0,0 +1,502 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/*################################################################
+ This program is half of a pair. Precv and Psend are meant to
+ be simple-as-possible examples of how to use the proton-c
+ engine interface to send and receive messages over a single
+ connection and a single session.
+
+ In addition to being examples, these programs or their
+ descendants will be used in performance regression testing
+ for both throughput and latency, and long-term soak testing.
+*################################################################*/
+
+#include <stdio.h>
+#include <string.h>
+#include <ctype.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <time.h>
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/driver.h>
+#include <proton/event.h>
+#include <proton/terminus.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+
+
+
+
+
+#define MY_BUF_SIZE 1000
+
+
+
+
+/*---------------------------------------------------------
+ These high-resolution times are used both for
+ interim timing reports -- i.e. every 'report_frequency'
+ messages -- and for the final timestamp, after all
+ expected messages have been received.
+---------------------------------------------------------*/
+static
+double
+get_time ( )
+{
+ struct timeval tv;
+ struct tm * timeinfo;
+
+ gettimeofday ( & tv, 0 );
+ timeinfo = localtime ( & tv.tv_sec );
+
+ double time_now = 3600 * timeinfo->tm_hour +
+ 60 * timeinfo->tm_min +
+ timeinfo->tm_sec;
+
+ time_now += ((double)(tv.tv_usec) / 1000000.0);
+ return time_now;
+}
+
+
+
+
+
+/*----------------------------------------------------
+ These absolute timestamps are useful in soak tests,
+ where I want to align the program's output with
+ output from top to look at CPU and memory use..
+----------------------------------------------------*/
+void
+print_timestamp_like_a_normal_person ( FILE * fp )
+{
+ char const * month_abbrevs[] = { "jan",
+ "feb",
+ "mar",
+ "apr",
+ "may",
+ "jun",
+ "jul",
+ "aug",
+ "sep",
+ "oct",
+ "nov",
+ "dec"
+ };
+ time_t rawtime;
+ struct tm * timeinfo;
+
+ time ( & rawtime );
+ timeinfo = localtime ( &rawtime );
+
+ char time_string[100];
+ sprintf ( time_string,
+ "%d-%s-%02d %02d:%02d:%02d",
+ 1900 + timeinfo->tm_year,
+ month_abbrevs[timeinfo->tm_mon],
+ timeinfo->tm_mday,
+ timeinfo->tm_hour,
+ timeinfo->tm_min,
+ timeinfo->tm_sec
+ );
+
+ fprintf ( fp, "timestamp %s\n", time_string );
+}
+
+
+
+
+
+int
+main ( int argc, char ** argv )
+{
+ char info[1000];
+
+ uint64_t received = 0;
+
+ char host [1000];
+ char port [1000];
+ char output_file_name[1000];
+
+ int initial_flow = 400;
+ int flow_increment = 200;
+
+ int report_frequency = 200000;
+ int64_t messages = 2000000,
+ delivery_count = 0;
+
+ strcpy ( host, "0.0.0.0" );
+ strcpy ( port, "5672" );
+
+ pn_driver_t * driver;
+ pn_listener_t * listener;
+ pn_connector_t * connector;
+ pn_connection_t * connection;
+ pn_collector_t * collector;
+ pn_transport_t * transport;
+ pn_sasl_t * sasl;
+ pn_session_t * session;
+ pn_event_t * event;
+ pn_link_t * link;
+ pn_delivery_t * delivery;
+
+
+ double last_time,
+ this_time,
+ time_diff;
+
+ char * message_data = (char *) malloc ( MY_BUF_SIZE );
+ int message_data_capacity = MY_BUF_SIZE;
+
+ FILE * output_fp;
+
+
+ /*-----------------------------------------------------------
+ Read the command lines args and override initialization.
+ -----------------------------------------------------------*/
+ for ( int i = 1; i < argc; ++ i )
+ {
+ if ( ! strcmp ( "--host", argv[i] ) )
+ {
+ strcpy ( host, argv[i+1] );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--port", argv[i] ) )
+ {
+ strcpy ( port, argv[i+1] );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--messages", argv[i] ) )
+ {
+ sscanf ( argv [ i+1 ], "%" SCNd64 , & messages );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--output", argv[i] ) )
+ {
+ if ( ! strcmp ( "stderr", argv[i+1] ) )
+ {
+ output_fp = stderr;
+ strcpy ( output_file_name, "stderr");
+ }
+ else
+ if ( ! strcmp ( "stdout", argv[i+1] ) )
+ {
+ output_fp = stdout;
+ strcpy ( output_file_name, "stdout");
+ }
+ else
+ {
+ output_fp = fopen ( argv[i+1], "w" );
+ strcpy ( output_file_name, argv[i+1] );
+ if ( ! output_fp )
+ {
+ fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] );
+ exit ( 1 );
+ }
+ }
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--report_frequency", argv[i] ) )
+ {
+ report_frequency = atoi ( argv[i+1] );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--initial_flow", argv[i] ) )
+ {
+ sscanf ( argv [ i+1 ], "%d", & initial_flow );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--flow_increment", argv[i] ) )
+ {
+ sscanf ( argv [ i+1 ], "%d", & flow_increment );
+ ++ i;
+ }
+ else
+ {
+ fprintf ( output_fp, "unknown arg |%s|\n", argv[i] );
+ exit ( 1 );
+ }
+ }
+
+ /*-----------------------------------------------
+ Show what we ended up with for all the args.
+ -----------------------------------------------*/
+ fprintf ( output_fp, "host %s\n", host );
+ fprintf ( output_fp, "port %s\n", port );
+ fprintf ( output_fp, "messages %" PRId64 "\n", messages );
+ fprintf ( output_fp, "report_frequency %d\n", report_frequency );
+ fprintf ( output_fp, "initial_flow %d\n", initial_flow );
+ fprintf ( output_fp, "flow_increment %d\n", flow_increment );
+ fprintf ( output_fp, "output %s\n", output_file_name );
+
+
+ /*--------------------------------------------
+ Create a standard driver and listen for the
+ initial connector.
+ --------------------------------------------*/
+ driver = pn_driver ( );
+
+ if ( ! pn_listener(driver, host, port, 0) )
+ {
+ fprintf ( output_fp, "precv listener creation failed.\n" );
+ exit ( 1 );
+ }
+
+ fprintf ( output_fp, "\nprecv ready...\n\n" );
+
+ while ( 1 )
+ {
+ pn_driver_wait ( driver, -1 );
+ if ( listener = pn_driver_listener(driver) )
+ {
+ if ( connector = pn_listener_accept(listener) )
+ break;
+ }
+ }
+
+ /*--------------------------------------------------------
+ Now make all the other structure around the connector,
+ and tell it that skipping sasl is OK.
+ --------------------------------------------------------*/
+ connection = pn_connection ( );
+ collector = pn_collector ( );
+ pn_connection_collect ( connection, collector );
+ pn_connector_set_connection ( connector, connection );
+
+ transport = pn_connector_transport ( connector );
+ sasl = pn_sasl ( transport );
+ pn_sasl_mechanisms ( sasl, "ANONYMOUS" );
+ pn_sasl_server ( sasl );
+ pn_sasl_allow_skip ( sasl, true );
+ pn_sasl_done ( sasl, PN_SASL_OK );
+
+ /*----------------------------------------------------------
+ If report_frequency is not set to zero, we will
+ produce a timing report every report_frequency messages.
+ The timing reported will be the delta from the last_time
+ to the current time.
+ This is useful in soak testing, where you basically
+ never stop, but still need to see how the system is doing
+ every so often.
+ ----------------------------------------------------------*/
+ last_time = get_time();
+
+
+ /*------------------------------------------------------------
+ A triply-nested loop.
+ In the outermost one, we just wait for activity to come in
+ from the driver.
+ ------------------------------------------------------------*/
+ while ( 1 )
+ {
+ pn_driver_wait ( driver, -1 );
+
+ /*---------------------------------------------------------------
+ In the next loop, we keep going as long as we processed
+ some events. This is because our own processing of events
+ may have caused more to be generated that we need to handle.
+ If we go back to the outermost loop and its pn_driver_wait()
+ without handling these events, we will end up with the sender
+ and receiver programs just staring at each other with blank
+ expressions on their faces.
+ ---------------------------------------------------------------*/
+ int event_count = 1;
+ while ( event_count > 0 )
+ {
+ event_count = 0;
+ pn_connector_process ( connector );
+
+ /*-------------------------------------------------------
+ After we process the connector, it may have generated
+ a batch of events for us to handle. As we go through
+ this batch of events, our handling may generate other
+ events which we must handle before going back to
+ pn_driver_wait().
+ --------------------------------------------------------*/
+ event = pn_collector_peek(collector);
+ while ( event )
+ {
+ ++ event_count;
+ pn_event_type_t event_type = pn_event_type ( event );
+ //fprintf ( output_fp, "precv event: %s\n", pn_event_type_name(event_type));
+
+
+ switch ( event_type )
+ {
+ case PN_CONNECTION_REMOTE_OPEN:
+ break;
+
+
+ case PN_SESSION_REMOTE_OPEN:
+ session = pn_event_session(event);
+ if ( pn_session_state(session) & PN_LOCAL_UNINIT )
+ {
+ // big number because it's in bytes.
+ pn_session_set_incoming_capacity ( session, 1000000 );
+ pn_session_open ( session );
+ }
+ break;
+
+
+ case PN_LINK_REMOTE_OPEN:
+ /*----------------------------------------------------
+ When we first open the link, we give it an initial
+ amount of credit in units of messages.
+ We will later increment its credit whenever credit
+ falls below some threshold.
+ ----------------------------------------------------*/
+ link = pn_event_link(event);
+ if (pn_link_state(link) & PN_LOCAL_UNINIT )
+ {
+ pn_link_open ( link );
+ pn_link_flow ( link, initial_flow );
+ }
+ break;
+
+
+ case PN_CONNECTION_BOUND:
+ if ( pn_connection_state(connection) & PN_LOCAL_UNINIT )
+ {
+ pn_connection_open ( connection );
+ }
+ break;
+
+
+ // And now the event that you've all been waiting for.....
+
+ case PN_DELIVERY:
+ link = pn_event_link ( event );
+ delivery = pn_event_delivery ( event );
+
+ /*------------------------------------------------
+ Since I want this program to be a receiver,
+ I am not interested in deliver-related events
+ unless they are incoming, 'readable' events.
+ ------------------------------------------------*/
+ if ( pn_delivery_readable ( delivery ) )
+ {
+ // If the delivery is partial I am just going to ignore
+ // it until it becomes complete.
+ if ( ! pn_delivery_partial ( delivery ) )
+ {
+ ++ delivery_count;
+ /*
+ if ( ! (delivery_count % report_frequency) )
+ {
+ pn_link_t * delivery_link = pn_delivery_link ( delivery );
+ int received_bytes = pn_delivery_pending ( delivery );
+ pn_link_recv ( delivery_link, incoming, 1000 );
+ fprintf ( output_fp, "received bytes: %d\n", received_bytes );
+ }
+ */
+
+ // don't bother updating. they're pre-settled.
+ // pn_delivery_update ( delivery, PN_ACCEPTED );
+ pn_delivery_settle ( delivery );
+
+ int credit = pn_link_credit ( link );
+
+ if ( delivery_count >= messages )
+ {
+ fprintf ( output_fp, "precv_stop %.3lf\n", get_time() );
+ goto all_done;
+ }
+
+ // Make a report frequency of zero shut down interim reporting.
+ if ( report_frequency
+ &&
+ (! (delivery_count % report_frequency))
+ )
+ {
+ this_time = get_time();
+ time_diff = this_time - last_time;
+
+ print_timestamp_like_a_normal_person ( output_fp );
+ fprintf ( output_fp,
+ "deliveries: %" PRIu64 " time: %.3lf\n",
+ delivery_count,
+ time_diff
+ );
+ fflush ( output_fp );
+ last_time = this_time;
+ }
+
+ /*----------------------------------------------------------
+ I replenish the credit whevere it falls below this limit
+ because I this psend / precv pair of programs to run as
+ fast as possible. A real application might want to do
+ something fancier here.
+ ----------------------------------------------------------*/
+ if ( credit <= flow_increment )
+ {
+ pn_link_flow ( link, flow_increment );
+ }
+ }
+ }
+ break;
+
+
+ case PN_TRANSPORT:
+ // not sure why I would care...
+ break;
+
+
+ default:
+ /*
+ fprintf ( output_fp,
+ "precv unhandled event: %s\n",
+ pn_event_type_name(event_type)
+ );
+ */
+ break;
+ }
+
+ pn_collector_pop ( collector );
+ event = pn_collector_peek(collector);
+ }
+ }
+ }
+
+
+all_done:
+ pn_session_close ( session );
+ pn_connection_close ( connection );
+ fclose ( output_fp );
+ return 0;
+}
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cefbf983/examples/engine/c/psend.c
----------------------------------------------------------------------
diff --git a/examples/engine/c/psend.c b/examples/engine/c/psend.c
new file mode 100644
index 0000000..2ad0edb
--- /dev/null
+++ b/examples/engine/c/psend.c
@@ -0,0 +1,373 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+/*################################################################
+ This program is half of a pair. Precv and Psend are meant to
+ be simple-as-possible examples of how to use the proton-c
+ engine interface to send and receive messages over a single
+ connection and a single session.
+
+ In addition to being examples, these programs or their
+ descendants will be used in performance regression testing
+ for both throughput and latency, and long-term soak testing.
+
+ This program, psend, is highly similar to its peer precv.
+ I put all the good comments in precv.
+*################################################################*/
+
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <ctype.h>
+#include <time.h>
+#include <sys/time.h>
+#define __STDC_FORMAT_MACROS
+#include <inttypes.h>
+
+
+#include <proton/connection.h>
+#include <proton/delivery.h>
+#include <proton/driver.h>
+#include <proton/event.h>
+#include <proton/terminus.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/session.h>
+
+
+#define MY_BUF_SIZE 1000
+
+
+
+void
+print_timestamp ( FILE * fp, char const * label )
+{
+ struct timeval tv;
+ struct tm * timeinfo;
+
+ gettimeofday ( & tv, 0 );
+ timeinfo = localtime ( & tv.tv_sec );
+
+ int seconds_today = 3600 * timeinfo->tm_hour +
+ 60 * timeinfo->tm_min +
+ timeinfo->tm_sec;
+
+ fprintf ( fp, "time : %d.%.6ld : %s\n", seconds_today, tv.tv_usec, label );
+}
+
+
+
+
+
+static
+double
+get_time ( )
+{
+ struct timeval tv;
+ struct tm * timeinfo;
+
+ gettimeofday ( & tv, 0 );
+ timeinfo = localtime ( & tv.tv_sec );
+
+ double time_now = 3600 * timeinfo->tm_hour +
+ 60 * timeinfo->tm_min +
+ timeinfo->tm_sec;
+
+ time_now += ((double)(tv.tv_usec) / 1000000.0);
+ return time_now;
+}
+
+
+
+
+
+int
+main ( int argc, char ** argv )
+{
+ char addr [ 1000 ];
+ char host [ 1000 ];
+ char port [ 1000 ];
+ char output_file_name[1000];
+
+
+ uint64_t messages = 2000000,
+ delivery_count = 0;
+
+ int message_length = 100;
+
+ bool done = false;
+ int sent_count = 0;
+ int n_links = 5;
+ int const max_links = 100;
+
+ strcpy ( addr, "queue" );
+ strcpy ( host, "0.0.0.0" );
+ strcpy ( port, "5672" );
+
+ FILE * output_fp;
+
+
+ for ( int i = 1; i < argc; ++ i )
+ {
+ if ( ! strcmp ( "--host", argv[i] ) )
+ {
+ strcpy ( host, argv[i+1] );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--port", argv[i] ) )
+ {
+ strcpy ( port, argv[i+1] );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--messages", argv[i] ) )
+ {
+ sscanf ( argv [ i+1 ], "%" SCNd64 , & messages );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--message_length", argv[i] ) )
+ {
+ sscanf ( argv [ i+1 ], "%d", & message_length );
+ ++ i;
+ }
+ else
+ if ( ! strcmp ( "--n_links", argv[i] ) )
+ {
+ sscanf ( argv [ i+1 ], "%d", & n_links );
+ ++ i;
+ }
+ if ( ! strcmp ( "--output", argv[i] ) )
+ {
+ if ( ! strcmp ( "stderr", argv[i+1] ) )
+ {
+ output_fp = stderr;
+ strcpy ( output_file_name, "stderr");
+ }
+ else
+ if ( ! strcmp ( "stdout", argv[i+1] ) )
+ {
+ output_fp = stdout;
+ strcpy ( output_file_name, "stdout");
+ }
+ else
+ {
+ output_fp = fopen ( argv[i+1], "w" );
+ strcpy ( output_file_name, argv[i+1] );
+ if ( ! output_fp )
+ {
+ fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] );
+ exit ( 1 );
+ }
+ }
+ ++ i;
+ }
+ else
+ {
+ fprintf ( output_fp, "unknown arg %s", argv[i] );
+ }
+ }
+
+
+ fprintf ( output_fp, "host %s\n", host );
+ fprintf ( output_fp, "port %s\n", port );
+ fprintf ( output_fp, "messages %" PRId64 "\n", messages );
+ fprintf ( output_fp, "message_length %d\n", message_length );
+ fprintf ( output_fp, "n_links %d\n", n_links );
+ fprintf ( output_fp, "output %s\n", output_file_name );
+
+
+ if ( n_links > max_links )
+ {
+ fprintf ( output_fp, "You can't have more than %d links.\n", max_links );
+ exit ( 1 );
+ }
+
+ pn_driver_t * driver;
+ pn_connector_t * connector;
+ pn_connector_t * driver_connector;
+ pn_connection_t * connection;
+ pn_collector_t * collector;
+ pn_link_t * links [ max_links ];
+ pn_session_t * session;
+ pn_event_t * event;
+ pn_delivery_t * delivery;
+
+
+ char * message = (char *) malloc(message_length);
+ memset ( message, 13, message_length );
+
+ /*----------------------------------------------------
+ Get everything set up.
+ We will have a single connector, a single
+ connection, a single session, and a single link.
+ ----------------------------------------------------*/
+ driver = pn_driver ( );
+ connector = pn_connector ( driver, host, port, 0 );
+
+ connection = pn_connection();
+ collector = pn_collector ( );
+ pn_connection_collect ( connection, collector );
+ pn_connector_set_connection ( connector, connection );
+
+ session = pn_session ( connection );
+ pn_connection_open ( connection );
+ pn_session_open ( session );
+
+ for ( int i = 0; i < n_links; ++ i )
+ {
+ char name[100];
+ sprintf ( name, "tvc_15_%d", i );
+ links[i] = pn_sender ( session, name );
+ pn_terminus_set_address ( pn_link_target(links[i]), addr );
+ pn_link_open ( links[i] );
+ }
+
+ /*-----------------------------------------------------------
+ For my speed tests, I do not want to count setup time.
+ Start timing here. The receiver will print out a similar
+ timestamp when he receives the final message.
+ -----------------------------------------------------------*/
+ fprintf ( output_fp, "psend: sending %llu messages.\n", messages );
+
+ // Just before we start sending, print the start timestamp.
+ fprintf ( output_fp, "psend_start %.3lf\n", get_time() );
+
+ while ( 1 )
+ {
+ pn_driver_wait ( driver, -1 );
+
+ int event_count = 1;
+ while ( event_count > 0 )
+ {
+ event_count = 0;
+ pn_connector_process ( connector );
+
+ event = pn_collector_peek(collector);
+ while ( event )
+ {
+ ++ event_count;
+ pn_event_type_t event_type = pn_event_type ( event );
+ //fprintf ( output_fp, "event: %s\n", pn_event_type_name ( event_type ) );
+
+ switch ( event_type )
+ {
+ case PN_LINK_FLOW:
+ {
+ if ( delivery_count < messages )
+ {
+ /*---------------------------------------------------
+ We may have opened multiple links.
+ The event will tell us which one this flow-event
+ happened on. If the flow event gave us some
+ credit, we will greedily send messages until it
+ is all used up.
+ ---------------------------------------------------*/
+ pn_link_t * link = pn_event_link ( event );
+ int credit = pn_link_credit ( link );
+
+ while ( credit > 0 )
+ {
+ // Every delivery we create needs a unique tag.
+ char str [ 100 ];
+ sprintf ( str, "%x", delivery_count ++ );
+ delivery = pn_delivery ( link, pn_dtag(str, strlen(str)) );
+
+ // If you settle the delivery before sending it,
+ // you will spend some time wondering why your
+ // messages don't have any content when they arrive
+ // at the receiver.
+ pn_link_send ( link, message, message_length );
+ pn_delivery_settle ( delivery );
+ pn_link_advance ( link );
+ credit = pn_link_credit ( link );
+ }
+
+ if ( delivery_count >= messages )
+ {
+ fprintf ( output_fp,
+ "I have sent all %d messages.\n" ,
+ delivery_count
+ );
+ /*
+ I'm still kind of hazy on how to shut down the
+ psend / precv system properly ....
+ I can't go to all_done here, or precv will never
+ get all its messages and terminate.
+ So I let precv terminate properly ... which means
+ that this program, psend, dies with an error.
+ Hmm.
+ */
+ // goto all_done;
+ }
+ }
+ }
+ break;
+
+
+ case PN_TRANSPORT:
+ // I don't know what this means here, either.
+ break;
+
+
+ case PN_TRANSPORT_TAIL_CLOSED:
+ goto all_done;
+ break;
+
+
+ default:
+ /*
+ fprintf ( output_fp,
+ "precv unhandled event: %s\n",
+ pn_event_type_name(event_type)
+ );
+ */
+ break;
+
+ }
+
+ pn_collector_pop ( collector );
+ event = pn_collector_peek(collector);
+ }
+ }
+ }
+
+ all_done:
+
+ for ( int i = 0; i < n_links; ++ i )
+ {
+ pn_link_close ( links[i] );
+ }
+
+ pn_session_close ( session );
+ pn_connection_close ( connection );
+
+ return 0;
+}
+
+
+
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org