You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/09/18 22:31:09 UTC

svn commit: r1626080 - in /qpid/proton/trunk/proton-c: include/proton/messenger.h src/messenger/messenger.c src/messenger/transform.c src/messenger/transform.h

Author: rhs
Date: Thu Sep 18 20:31:08 2014
New Revision: 1626080

URL: http://svn.apache.org/r1626080
Log:
PROTON-669: applied patch from dominic for fail fast checking of messenger routes

Modified:
    qpid/proton/trunk/proton-c/include/proton/messenger.h
    qpid/proton/trunk/proton-c/src/messenger/messenger.c
    qpid/proton/trunk/proton-c/src/messenger/transform.c
    qpid/proton/trunk/proton-c/src/messenger/transform.h

Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1626080&r1=1626079&r2=1626080&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Thu Sep 18 20:31:08 2014
@@ -900,6 +900,28 @@ PN_EXTERN pn_timestamp_t pn_messenger_de
  * @}
  */
 
+#define PN_FLAGS_CHECK_ROUTES                                                  \
+  (0x1) /** Messenger flag to indicate that a call                             \
+            to pn_messenger_start should check that                            \
+            any defined routes are valid */
+
+/** Sets control flags to enable additional function for the Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @param[in] flags 0 or PN_FLAGS_CHECK_ROUTES
+ *
+ * @return an error code of zero if there is no error
+ */
+PN_EXTERN int pn_messenger_set_flags(pn_messenger_t *messenger,
+                                     const int flags);
+
+/** Gets the flags for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @return The flags set for the messenger
+ */
+PN_EXTERN int pn_messenger_get_flags(pn_messenger_t *messenger);
+
 #ifdef __cplusplus
 }
 #endif

Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1626080&r1=1626079&r2=1626080&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Sep 18 20:31:08 2014
@@ -100,6 +100,7 @@ struct pn_messenger_t {
   int receivers;     // # receiver links
   int draining;      // # links in drain state
   int connection_error;
+  int flags;
   bool blocking;
   bool passive;
   bool interrupted;
@@ -372,6 +373,10 @@ static pn_listener_ctx_t *pn_listener_ct
   pn_socket_t socket = pn_listen(messenger->io, host, port ? port : default_port(scheme));
   if (socket == PN_INVALID_SOCKET) {
     pn_error_copy(messenger->error, pn_io_error(messenger->io));
+    pn_error_format(messenger->error, PN_ERR, "CONNECTION ERROR (%s:%s): %s\n",
+                    messenger->address.host, messenger->address.port,
+                    pn_error_text(messenger->error));
+
     return NULL;
   }
 
@@ -639,6 +644,7 @@ pn_messenger_t *pn_messenger(const char 
     m->rewritten = pn_string(NULL);
     m->domain = pn_string(NULL);
     m->connection_error = 0;
+    m->flags = 0;
   }
 
   return m;
@@ -1430,11 +1436,85 @@ int pn_messenger_sync(pn_messenger_t *me
   }
 }
 
+static void pni_parse(pn_address_t *address);
+pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger,
+                                      const char *address, char **name);
+int pn_messenger_work(pn_messenger_t *messenger, int timeout);
+
 int pn_messenger_start(pn_messenger_t *messenger)
 {
   if (!messenger) return PN_ARG_ERR;
-  // right now this is a noop
-  return 0;
+
+  int error = 0;
+
+  // When checking of routes is required we attempt to resolve each route
+  // with a substitution that has a defined scheme, address and port. If
+  // any of theses routes is invalid an appropriate error code will be
+  // returned. Currently no attempt is made to check the name part of the
+  // address, as the intent here is to fail fast if the addressed host
+  // is invalid or unavailable.
+  if (messenger->flags | PN_FLAGS_CHECK_ROUTES) {
+    pn_list_t *substitutions = pn_list(PN_WEAKREF, 0);
+    pn_transform_get_substitutions(messenger->routes, substitutions);
+    for (size_t i = 0; i < pn_list_size(substitutions) && error == 0; i++) {
+      pn_string_t *substitution = (pn_string_t *)pn_list_get(substitutions, i);
+      if (substitution) {
+        pn_address_t addr;
+        addr.text = pn_string(NULL);
+        error = pn_string_copy(addr.text, substitution);
+        if (!error) {
+          pni_parse(&addr);
+          if (addr.scheme && strlen(addr.scheme) > 0 &&
+              !strstr(addr.scheme, "$") && addr.host && strlen(addr.host) > 0 &&
+              !strstr(addr.host, "$") && addr.port && strlen(addr.port) > 0 &&
+              !strstr(addr.port, "$")) {
+            pn_string_t *check_addr = pn_string(NULL);
+            // ipv6 hosts need to be wrapped in [] within a URI
+            if (strstr(addr.host, ":")) {
+              pn_string_format(check_addr, "%s://[%s]:%s/", addr.scheme,
+                               addr.host, addr.port);
+            } else {
+              pn_string_format(check_addr, "%s://%s:%s/", addr.scheme,
+                               addr.host, addr.port);
+            }
+            char *name = NULL;
+            pn_connection_t *connection = pn_messenger_resolve(
+                messenger, pn_string_get(check_addr), &name);
+            pn_free(check_addr);
+            if (!connection) {
+              if (pn_error_code(messenger->error) == 0)
+                pn_error_copy(messenger->error, pn_io_error(messenger->io));
+              pn_error_format(messenger->error, PN_ERR,
+                              "CONNECTION ERROR (%s:%s): %s\n",
+                              messenger->address.host, messenger->address.port,
+                              pn_error_text(messenger->error));
+              error = pn_error_code(messenger->error);
+            } else {
+              // Send and receive outstanding messages until connection
+              // completes or an error occurs
+              int work = pn_messenger_work(messenger, -1);
+              pn_connection_ctx_t *cctx =
+                  (pn_connection_ctx_t *)pn_connection_get_context(connection);
+              while ((work > 0 ||
+                      (pn_connection_state(connection) & PN_REMOTE_UNINIT) ||
+                      pni_connection_pending(cctx->selectable) != (ssize_t)0) &&
+                     pn_error_code(messenger->error) == 0)
+                work = pn_messenger_work(messenger, 0);
+              if (work < 0 && work != PN_TIMEOUT) {
+                error = work;
+              } else {
+                error = pn_error_code(messenger->error);
+              }
+            }
+          }
+          pn_free(addr.text);
+        }
+      }
+    }
+    pn_free(substitutions);
+  }
+
+  return error;
 }
 
 bool pn_messenger_stopped(pn_messenger_t *messenger)
@@ -2154,3 +2234,18 @@ int pn_messenger_rewrite(pn_messenger_t 
   pn_transform_rule(messenger->rewrites, pattern, address);
   return 0;
 }
+
+PN_EXTERN int pn_messenger_set_flags(pn_messenger_t *messenger, const int flags)
+{
+  if (!messenger)
+    return PN_ARG_ERR;
+  if (flags != 0 && (flags ^ PN_FLAGS_CHECK_ROUTES) != 0)
+    return PN_ARG_ERR;
+  messenger->flags = flags;
+  return 0;
+}
+
+PN_EXTERN int pn_messenger_get_flags(pn_messenger_t *messenger)
+{
+  return messenger ? messenger->flags : 0;
+}

Modified: qpid/proton/trunk/proton-c/src/messenger/transform.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/transform.c?rev=1626080&r1=1626079&r2=1626080&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/transform.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/transform.c Thu Sep 18 20:31:08 2014
@@ -19,7 +19,6 @@
  *
  */
 
-#include <proton/object.h>
 #include <string.h>
 #include <assert.h>
 #include <ctype.h>
@@ -241,3 +240,15 @@ bool pn_transform_matched(pn_transform_t
 {
   return transform->matched;
 }
+
+int pn_transform_get_substitutions(pn_transform_t *transform,
+                                   pn_list_t *substitutions)
+{
+  int size = pn_list_size(transform->rules);
+  for (size_t i = 0; i < (size_t)size; i++) {
+    pn_rule_t *rule = (pn_rule_t *)pn_list_get(transform->rules, i);
+    pn_list_add(substitutions, rule->substitution);
+  }
+
+  return size;
+}

Modified: qpid/proton/trunk/proton-c/src/messenger/transform.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/transform.h?rev=1626080&r1=1626079&r2=1626080&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/transform.h (original)
+++ qpid/proton/trunk/proton-c/src/messenger/transform.h Thu Sep 18 20:31:08 2014
@@ -22,6 +22,7 @@
  *
  */
 
+#include <proton/object.h>
 #include <proton/buffer.h>
 
 typedef struct pn_transform_t pn_transform_t;
@@ -32,6 +33,7 @@ void pn_transform_rule(pn_transform_t *t
 int pn_transform_apply(pn_transform_t *transform, const char *src,
                        pn_string_t *dest);
 bool pn_transform_matched(pn_transform_t *transform);
-
+int pn_transform_get_substitutions(pn_transform_t *transform,
+                                   pn_list_t *substitutions);
 
 #endif /* transform.h */



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org