You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2014/09/18 22:31:09 UTC
svn commit: r1626080 - in /qpid/proton/trunk/proton-c:
include/proton/messenger.h src/messenger/messenger.c
src/messenger/transform.c src/messenger/transform.h
Author: rhs
Date: Thu Sep 18 20:31:08 2014
New Revision: 1626080
URL: http://svn.apache.org/r1626080
Log:
PROTON-669: applied patch from dominic for fail fast checking of messenger routes
Modified:
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-c/src/messenger/transform.c
qpid/proton/trunk/proton-c/src/messenger/transform.h
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1626080&r1=1626079&r2=1626080&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Thu Sep 18 20:31:08 2014
@@ -900,6 +900,28 @@ PN_EXTERN pn_timestamp_t pn_messenger_de
* @}
*/
+#define PN_FLAGS_CHECK_ROUTES \
+ (0x1) /** Messenger flag to indicate that a call \
+ to pn_messenger_start should check that \
+ any defined routes are valid */
+
+/** Sets control flags to enable additional function for the Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @param[in] flags 0 or PN_FLAGS_CHECK_ROUTES
+ *
+ * @return an error code of zero if there is no error
+ */
+PN_EXTERN int pn_messenger_set_flags(pn_messenger_t *messenger,
+ const int flags);
+
+/** Gets the flags for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ * @return The flags set for the messenger
+ */
+PN_EXTERN int pn_messenger_get_flags(pn_messenger_t *messenger);
+
#ifdef __cplusplus
}
#endif
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1626080&r1=1626079&r2=1626080&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Thu Sep 18 20:31:08 2014
@@ -100,6 +100,7 @@ struct pn_messenger_t {
int receivers; // # receiver links
int draining; // # links in drain state
int connection_error;
+ int flags;
bool blocking;
bool passive;
bool interrupted;
@@ -372,6 +373,10 @@ static pn_listener_ctx_t *pn_listener_ct
pn_socket_t socket = pn_listen(messenger->io, host, port ? port : default_port(scheme));
if (socket == PN_INVALID_SOCKET) {
pn_error_copy(messenger->error, pn_io_error(messenger->io));
+ pn_error_format(messenger->error, PN_ERR, "CONNECTION ERROR (%s:%s): %s\n",
+ messenger->address.host, messenger->address.port,
+ pn_error_text(messenger->error));
+
return NULL;
}
@@ -639,6 +644,7 @@ pn_messenger_t *pn_messenger(const char
m->rewritten = pn_string(NULL);
m->domain = pn_string(NULL);
m->connection_error = 0;
+ m->flags = 0;
}
return m;
@@ -1430,11 +1436,85 @@ int pn_messenger_sync(pn_messenger_t *me
}
}
+static void pni_parse(pn_address_t *address);
+pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger,
+ const char *address, char **name);
+int pn_messenger_work(pn_messenger_t *messenger, int timeout);
+
int pn_messenger_start(pn_messenger_t *messenger)
{
if (!messenger) return PN_ARG_ERR;
- // right now this is a noop
- return 0;
+
+ int error = 0;
+
+ // When checking of routes is required we attempt to resolve each route
+ // with a substitution that has a defined scheme, address and port. If
+ // any of theses routes is invalid an appropriate error code will be
+ // returned. Currently no attempt is made to check the name part of the
+ // address, as the intent here is to fail fast if the addressed host
+ // is invalid or unavailable.
+ if (messenger->flags | PN_FLAGS_CHECK_ROUTES) {
+ pn_list_t *substitutions = pn_list(PN_WEAKREF, 0);
+ pn_transform_get_substitutions(messenger->routes, substitutions);
+ for (size_t i = 0; i < pn_list_size(substitutions) && error == 0; i++) {
+ pn_string_t *substitution = (pn_string_t *)pn_list_get(substitutions, i);
+ if (substitution) {
+ pn_address_t addr;
+ addr.text = pn_string(NULL);
+ error = pn_string_copy(addr.text, substitution);
+ if (!error) {
+ pni_parse(&addr);
+ if (addr.scheme && strlen(addr.scheme) > 0 &&
+ !strstr(addr.scheme, "$") && addr.host && strlen(addr.host) > 0 &&
+ !strstr(addr.host, "$") && addr.port && strlen(addr.port) > 0 &&
+ !strstr(addr.port, "$")) {
+ pn_string_t *check_addr = pn_string(NULL);
+ // ipv6 hosts need to be wrapped in [] within a URI
+ if (strstr(addr.host, ":")) {
+ pn_string_format(check_addr, "%s://[%s]:%s/", addr.scheme,
+ addr.host, addr.port);
+ } else {
+ pn_string_format(check_addr, "%s://%s:%s/", addr.scheme,
+ addr.host, addr.port);
+ }
+ char *name = NULL;
+ pn_connection_t *connection = pn_messenger_resolve(
+ messenger, pn_string_get(check_addr), &name);
+ pn_free(check_addr);
+ if (!connection) {
+ if (pn_error_code(messenger->error) == 0)
+ pn_error_copy(messenger->error, pn_io_error(messenger->io));
+ pn_error_format(messenger->error, PN_ERR,
+ "CONNECTION ERROR (%s:%s): %s\n",
+ messenger->address.host, messenger->address.port,
+ pn_error_text(messenger->error));
+ error = pn_error_code(messenger->error);
+ } else {
+ // Send and receive outstanding messages until connection
+ // completes or an error occurs
+ int work = pn_messenger_work(messenger, -1);
+ pn_connection_ctx_t *cctx =
+ (pn_connection_ctx_t *)pn_connection_get_context(connection);
+ while ((work > 0 ||
+ (pn_connection_state(connection) & PN_REMOTE_UNINIT) ||
+ pni_connection_pending(cctx->selectable) != (ssize_t)0) &&
+ pn_error_code(messenger->error) == 0)
+ work = pn_messenger_work(messenger, 0);
+ if (work < 0 && work != PN_TIMEOUT) {
+ error = work;
+ } else {
+ error = pn_error_code(messenger->error);
+ }
+ }
+ }
+ pn_free(addr.text);
+ }
+ }
+ }
+ pn_free(substitutions);
+ }
+
+ return error;
}
bool pn_messenger_stopped(pn_messenger_t *messenger)
@@ -2154,3 +2234,18 @@ int pn_messenger_rewrite(pn_messenger_t
pn_transform_rule(messenger->rewrites, pattern, address);
return 0;
}
+
+PN_EXTERN int pn_messenger_set_flags(pn_messenger_t *messenger, const int flags)
+{
+ if (!messenger)
+ return PN_ARG_ERR;
+ if (flags != 0 && (flags ^ PN_FLAGS_CHECK_ROUTES) != 0)
+ return PN_ARG_ERR;
+ messenger->flags = flags;
+ return 0;
+}
+
+PN_EXTERN int pn_messenger_get_flags(pn_messenger_t *messenger)
+{
+ return messenger ? messenger->flags : 0;
+}
Modified: qpid/proton/trunk/proton-c/src/messenger/transform.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/transform.c?rev=1626080&r1=1626079&r2=1626080&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/transform.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/transform.c Thu Sep 18 20:31:08 2014
@@ -19,7 +19,6 @@
*
*/
-#include <proton/object.h>
#include <string.h>
#include <assert.h>
#include <ctype.h>
@@ -241,3 +240,15 @@ bool pn_transform_matched(pn_transform_t
{
return transform->matched;
}
+
+int pn_transform_get_substitutions(pn_transform_t *transform,
+ pn_list_t *substitutions)
+{
+ int size = pn_list_size(transform->rules);
+ for (size_t i = 0; i < (size_t)size; i++) {
+ pn_rule_t *rule = (pn_rule_t *)pn_list_get(transform->rules, i);
+ pn_list_add(substitutions, rule->substitution);
+ }
+
+ return size;
+}
Modified: qpid/proton/trunk/proton-c/src/messenger/transform.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/transform.h?rev=1626080&r1=1626079&r2=1626080&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/transform.h (original)
+++ qpid/proton/trunk/proton-c/src/messenger/transform.h Thu Sep 18 20:31:08 2014
@@ -22,6 +22,7 @@
*
*/
+#include <proton/object.h>
#include <proton/buffer.h>
typedef struct pn_transform_t pn_transform_t;
@@ -32,6 +33,7 @@ void pn_transform_rule(pn_transform_t *t
int pn_transform_apply(pn_transform_t *transform, const char *src,
pn_string_t *dest);
bool pn_transform_matched(pn_transform_t *transform);
-
+int pn_transform_get_substitutions(pn_transform_t *transform,
+ pn_list_t *substitutions);
#endif /* transform.h */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org