You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:51:22 UTC

[15/38] qpid-proton git commit: PROTON-1403: c proactor direct example

PROTON-1403: c proactor direct example

direct server that can accept a connection from the send or receive examples and act
as a directly-connected receive or send as appropriate.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6291a75c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6291a75c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6291a75c

Branch: refs/heads/go1
Commit: 6291a75cbed5840578c0d2577424ed574b8ee56f
Parents: 8568737
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Feb 14 12:06:43 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Feb 14 12:51:31 2017 -0500

----------------------------------------------------------------------
 examples/c/proactor/CMakeLists.txt |   2 +-
 examples/c/proactor/README.dox     |  14 +-
 examples/c/proactor/broker.c       |   3 +-
 examples/c/proactor/direct.c       | 357 ++++++++++++++++++++++++++++++++
 examples/c/proactor/receive.c      |   2 +-
 examples/c/proactor/test.py        |  29 ++-
 examples/exampletest.py            |   1 +
 7 files changed, 398 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt
index 7fec1c6..4189cf5 100644
--- a/examples/c/proactor/CMakeLists.txt
+++ b/examples/c/proactor/CMakeLists.txt
@@ -41,7 +41,7 @@ else(WIN32)
   set(PLATFORM_LIBS pthread)
 endif(WIN32)
 
-foreach(name broker send receive)
+foreach(name broker send receive direct)
   add_executable(proactor-${name} ${name}.c)
   target_link_libraries(proactor-${name} ${Proton_LIBRARIES} ${PLATFORM_LIBS})
   set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name})

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/README.dox
----------------------------------------------------------------------
diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox
index 4b09cb7..19083e5 100644
--- a/examples/c/proactor/README.dox
+++ b/examples/c/proactor/README.dox
@@ -2,16 +2,22 @@
  * @example send.c
  *
  * Send a fixed number of messages to the "example" node.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
  *
  * @example receive.c
  *
- * Subscribes to the 'example' node and prints the message bodies
- * received.
+ * Subscribes to the 'example' node and prints the message bodies received.
+ * Can be used with @ref broker.c, @ref direct.c or an external AMQP broker.
+ *
+ * @example direct.c
+ *
+ * A server that can be used to demonstrate direct (no broker) peer-to-peer communication
+ * It can accept an incoming connection from either the @ref send.c or @ref receive.c examples
+ * and will act as the directly-connected counterpart (receive or send)
  *
  * @example broker.c
  *
- * A simple multithreaded broker that works with the send and receive
- * examples.
+ * A simple multithreaded broker that works with the @ref send.c and @ref receive.c examples.
  *
  * __Requires C++11__
  */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/broker.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c
index 5679290..ebf4068 100644
--- a/examples/c/proactor/broker.c
+++ b/examples/c/proactor/broker.c
@@ -307,7 +307,7 @@ static void handle(broker_t* b, pn_event_t* e) {
     pn_listener_accept(pn_event_listener(e), pn_connection());
     break;
 
-   case PN_CONNECTION_INIT: 
+   case PN_CONNECTION_INIT:
      pn_connection_set_container(c, b->container_id);
      break;
 
@@ -398,6 +398,7 @@ static void handle(broker_t* b, pn_event_t* e) {
 
    case PN_LISTENER_CLOSE:
     check_condition(e, pn_listener_condition(pn_event_listener(e)));
+    broker_stop(b);
     break;
 
    case PN_PROACTOR_INACTIVE: /* listener and all connections closed */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/direct.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c
new file mode 100644
index 0000000..26f1b33
--- /dev/null
+++ b/examples/c/proactor/direct.c
@@ -0,0 +1,357 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <proton/connection.h>
+#include <proton/connection_driver.h>
+#include <proton/delivery.h>
+#include <proton/proactor.h>
+#include <proton/link.h>
+#include <proton/message.h>
+#include <proton/sasl.h>
+#include <proton/session.h>
+#include <proton/transport.h>
+#include <proton/url.h>
+#include "pncompat/misc_funcs.inc"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+typedef char str[1024];
+
+typedef struct app_data_t {
+  /* Common values */
+  pn_proactor_t *proactor;
+  bool finished;
+  str address;
+  str container_id;
+  pn_rwbytes_t message_buffer;
+  int message_count;
+
+  /* Sender values */
+  int sent;
+  int acknowledged;
+  pn_link_t *sender;
+  pn_millis_t delay;
+  bool delaying;
+
+  /* Receiver values */
+  int received;
+} app_data_t;
+
+static const int BATCH = 1000; /* Batch size for unlimited receive */
+
+int exit_code = 0;
+
+static void check_condition(pn_event_t *e, pn_condition_t *cond) {
+  if (pn_condition_is_set(cond)) {
+    exit_code = 1;
+    fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)),
+            pn_condition_get_name(cond), pn_condition_get_description(cond));
+  }
+}
+
+/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */
+static pn_bytes_t encode_message(app_data_t* app) {
+  /* Construct a message with the map { "sequence": app.sent } */
+  pn_message_t* message = pn_message();
+  pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */
+  pn_data_t* body = pn_message_body(message);
+  pn_data_put_map(body);
+  pn_data_enter(body);
+  pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence"));
+  pn_data_put_int(body, app->sent); /* The sequence number */
+  pn_data_exit(body);
+
+  /* encode the message, expanding the encode buffer as needed */
+  if (app->message_buffer.start == NULL) {
+    static const size_t initial_size = 128;
+    app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size));
+  }
+  /* app->message_buffer is the total buffer space available. */
+  /* mbuf wil point at just the portion used by the encoded message */
+  pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start);
+  int status = 0;
+  while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) {
+    app->message_buffer.size *= 2;
+    app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size);
+    mbuf.size = app->message_buffer.size;
+  }
+  if (status != 0) {
+    fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message)));
+    exit(1);
+  }
+  pn_message_free(message);
+  return pn_bytes(mbuf.size, mbuf.start);
+}
+
+static void send(app_data_t* app) {
+  while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) {
+    ++app->sent;
+    // Use sent counter bytes as unique delivery tag.
+    pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
+    pn_bytes_t msgbuf = encode_message(app);
+    pn_link_send(app->sender, msgbuf.start, msgbuf.size);
+    pn_link_advance(app->sender);
+    if (app->delay && app->sent < app->message_count) {
+      /* If delay is set, wait for TIMEOUT event to send more */
+      app->delaying = true;
+      pn_proactor_set_timeout(app->proactor, app->delay);
+      break;
+    }
+  }
+}
+
+#define MAX_SIZE 1024
+
+static void decode_message(pn_delivery_t *dlv) {
+  static char buffer[MAX_SIZE];
+  ssize_t len;
+  // try to decode the message body
+  if (pn_delivery_pending(dlv) < MAX_SIZE) {
+    // read in the raw data
+    len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE);
+    if (len > 0) {
+      // decode it into a proton message
+      pn_message_t *m = pn_message();
+      if (PN_OK == pn_message_decode(m, buffer, len)) {
+        pn_string_t *s = pn_string(NULL);
+        pn_inspect(pn_message_body(m), s);
+        printf("%s\n", pn_string_get(s));
+        pn_free(s);
+      }
+      pn_message_free(m);
+    }
+  }
+}
+
+/* This function handles events when we are acting as the receiver */
+static void handle_receive(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t *l = pn_event_link(event);
+     pn_link_open(l);
+     pn_link_flow(l, app->message_count ? app->message_count : BATCH);
+   } break;
+
+   case PN_DELIVERY: {
+     /* A message has been received */
+     pn_link_t *link = NULL;
+     pn_delivery_t *dlv = pn_event_delivery(event);
+     if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) {
+       link = pn_delivery_link(dlv);
+       decode_message(dlv);
+       /* Accept the delivery */
+       pn_delivery_update(dlv, PN_ACCEPTED);
+       /* done with the delivery, move to the next and free it */
+       pn_link_advance(link);
+       pn_delivery_settle(dlv);  /* dlv is now freed */
+
+       if (app->message_count == 0) {
+         /* receive forever - see if more credit is needed */
+         if (pn_link_credit(link) < BATCH/2) {
+           /* Grant enough credit to bring it up to BATCH: */
+           pn_link_flow(link, BATCH - pn_link_credit(link));
+         }
+       } else if (++app->received >= app->message_count) {
+         /* done receiving, close the endpoints */
+         printf("%d messages received\n", app->received);
+         pn_session_t *ssn = pn_link_session(link);
+         pn_link_close(link);
+         pn_session_close(ssn);
+         pn_connection_close(pn_session_connection(ssn));
+       }
+     }
+   } break;
+
+   default:
+    break;
+  }
+}
+
+/* This function handles events when we are acting as the sender */
+static void handle_send(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LINK_REMOTE_OPEN: {
+     pn_link_t* l = pn_event_link(event);
+     pn_terminus_set_address(pn_link_target(l), app->address);
+     pn_link_open(l);
+   } break;
+
+   case PN_LINK_FLOW:
+    /* The peer has given us some credit, now we can send messages */
+    if (!app->delaying) {
+      app->sender = pn_event_link(event);
+      send(app);
+    } break;
+
+   case PN_DELIVERY: {
+     /* We received acknowledgedment from the peer that a message was delivered. */
+     pn_delivery_t* d = pn_event_delivery(event);
+     if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
+       if (++app->acknowledged == app->message_count) {
+         printf("%d messages sent and acknowledged\n", app->acknowledged);
+         pn_connection_close(pn_event_connection(event));
+       }
+     }
+   } break;
+
+   default:
+    break;
+  }
+}
+
+/* Handle all events, delegate to handle_send or handle_receive depending on link mode */
+static void handle(app_data_t* app, pn_event_t* event) {
+  switch (pn_event_type(event)) {
+
+   case PN_LISTENER_ACCEPT:
+    pn_listener_accept(pn_event_listener(event), pn_connection());
+    break;
+
+   case PN_CONNECTION_INIT:
+    pn_connection_set_container(pn_event_connection(event), app->container_id);
+    break;
+
+   case PN_CONNECTION_BOUND: {
+     /* Turn off security */
+     pn_transport_t *t = pn_event_transport(event);
+     pn_transport_require_auth(t, false);
+     pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
+   }
+   case PN_CONNECTION_REMOTE_OPEN: {
+     pn_connection_open(pn_event_connection(event)); /* Complete the open */
+     break;
+   }
+
+   case PN_SESSION_REMOTE_OPEN: {
+     pn_session_open(pn_event_session(event));
+     break;
+   }
+
+   case PN_TRANSPORT_CLOSED:
+    check_condition(event, pn_transport_condition(pn_event_transport(event)));
+    app->finished = true;
+    break;
+
+   case PN_CONNECTION_REMOTE_CLOSE:
+    check_condition(event, pn_connection_remote_condition(pn_event_connection(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_SESSION_REMOTE_CLOSE:
+    check_condition(event, pn_session_remote_condition(pn_event_session(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_LINK_REMOTE_CLOSE:
+   case PN_LINK_REMOTE_DETACH:
+    check_condition(event, pn_link_remote_condition(pn_event_link(event)));
+    pn_connection_close(pn_event_connection(event));
+    break;
+
+   case PN_PROACTOR_TIMEOUT:
+    /* Wake the sender's connection */
+    pn_connection_wake(pn_session_connection(pn_link_session(app->sender)));
+    break;
+
+   case PN_CONNECTION_WAKE:
+    /* Timeout, we can send more. */
+    app->delaying = false;
+    send(app);
+    break;
+
+   case PN_PROACTOR_INACTIVE:
+    app->finished = true;
+    break;
+
+   case PN_LISTENER_CLOSE:
+    check_condition(event, pn_listener_condition(pn_event_listener(event)));
+    app->finished = true;
+    break;
+
+   default: {
+     pn_link_t *l = pn_event_link(event);
+     if (l) {                      /* Only delegate link-related events */
+       if (pn_link_is_sender(l)) {
+         handle_send(app, event);
+       } else {
+         handle_receive(app, event);
+       }
+     }
+   }
+  }
+}
+
+static void usage(const char *arg0) {
+  fprintf(stderr, "Usage: %s [-a URL] [-m message-count] [-d delay-ms]\n", arg0);
+  fprintf(stderr, "Demonstrates direct peer-to-peer AMQP communication without a broker. Accepts a connection from either the send.c or receive.c client and provides the complementary behavior (receive or send.");
+  exit(1);
+}
+
+int main(int argc, char **argv) {
+  /* Default values for application and connection. */
+  app_data_t app = {0};
+  app.message_count = 100;
+  const char* urlstr = NULL;
+
+  int opt;
+  while((opt = getopt(argc, argv, "a:m:d:")) != -1) {
+    switch(opt) {
+     case 'a': urlstr = optarg; break;
+     case 'm': app.message_count = atoi(optarg); break;
+     case 'd': app.delay = atoi(optarg); break;
+     default: usage(argv[0]); break;
+    }
+  }
+  if (optind < argc)
+    usage(argv[0]);
+  /* Note container-id should be unique */
+  snprintf(app.container_id, sizeof(app.container_id), "%s", argv[0]);
+
+  /* Parse the URL or use default values */
+  pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL;
+  /* Listen on IPv6 wildcard. On systems that do not set IPV6ONLY by default,
+     this will also listen for mapped IPv4 on the same port.
+  */
+  const char *host = url ? pn_url_get_host(url) : "::";
+  const char *port = url ? pn_url_get_port(url) : "amqp";
+
+  app.proactor = pn_proactor();
+  pn_proactor_listen(app.proactor, pn_listener(), host, port, 16);
+  printf("listening on '%s:%s'\n", host, port);
+  if (url) pn_url_free(url);
+
+  do {
+    pn_event_batch_t *events = pn_proactor_wait(app.proactor);
+    pn_event_t *e;
+    while ((e = pn_event_batch_next(events))) {
+      handle(&app, e);
+    }
+    pn_proactor_done(app.proactor, events);
+  } while(!app.finished);
+
+  pn_proactor_free(app.proactor);
+  free(app.message_buffer.start);
+  return exit_code;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/receive.c
----------------------------------------------------------------------
diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c
index 1eb54b6..1bc5509 100644
--- a/examples/c/proactor/receive.c
+++ b/examples/c/proactor/receive.c
@@ -46,7 +46,7 @@ typedef struct app_data_t {
   bool finished;
 } app_data_t;
 
-static const int BATCH = 100; /* Batch size for unlimited receive */
+static const int BATCH = 1000; /* Batch size for unlimited receive */
 
 static int exit_code = 0;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/c/proactor/test.py
----------------------------------------------------------------------
diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py
index 29aa327..45bb817 100644
--- a/examples/c/proactor/test.py
+++ b/examples/c/proactor/test.py
@@ -19,11 +19,9 @@
 
 # This is a test script to run the examples and verify that they behave as expected.
 
+import unittest, sys, time
 from exampletest import *
 
-import unittest
-import sys
-
 def python_cmd(name):
     dir = os.path.dirname(__file__)
     return [sys.executable, os.path.join(dir, "..", "..", "python", name)]
@@ -55,6 +53,31 @@ class CExampleTest(BrokerTestCase):
         r = self.proc(["receive", "-a", self.addr, "-m3"])
         self.assertEqual(receive_expect(3), r.wait_out())
 
+    def retry(self, args, max=10):
+        """Run until output does not contain "connection refused", up to max retries"""
+        while True:
+            try:
+                return self.proc(args).wait_out()
+            except ProcError, e:
+                if "connection refused" in e.args[0] and max > 0:
+                    max -= 1
+                    time.sleep(.01)
+                    continue
+                raise
+
+    def test_send_direct(self):
+        """Send first then receive"""
+        addr = "127.0.0.1:%s/examples" % (pick_port())
+        d = self.proc(["direct", "-a", addr])
+        self.assertEqual("100 messages sent and acknowledged\n", self.retry(["send", "-a", addr]))
+        self.assertIn(receive_expect(100), d.wait_out())
+
+    def test_receive_direct(self):
+        """Send first then receive"""
+        addr = "127.0.0.1:%s/examples" % (pick_port())
+        d = self.proc(["direct", "-a", addr])
+        self.assertEqual(receive_expect(100), self.retry(["receive", "-a", addr]))
+        self.assertIn("100 messages sent and acknowledged\n", d.wait_out())
 
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6291a75c/examples/exampletest.py
----------------------------------------------------------------------
diff --git a/examples/exampletest.py b/examples/exampletest.py
index d40b9cb..bf7ea9b 100644
--- a/examples/exampletest.py
+++ b/examples/exampletest.py
@@ -65,6 +65,7 @@ class Proc(Popen):
         """Start an example process"""
         args = list(args)
         self.args = args
+        self.kwargs = kwargs
         self._out = os.tmpfile()
         try:
             Popen.__init__(self, self.env_args + self.args, stdout=self._out, stderr=STDOUT, **kwargs)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org