You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2012/07/03 22:50:53 UTC
svn commit: r1356935 - in /qpid/proton/trunk: proton-c/include/proton/
proton-c/src/ proton-c/src/dispatcher/ tests/proton_tests/
Author: rhs
Date: Tue Jul 3 20:50:51 2012
New Revision: 1356935
URL: http://svn.apache.org/viewvc?rev=1356935&view=rev
Log:
added timeouts to messenger API; added messenger test suite; tweaked logging to identify connection; added PN_TRACE_DRV log flag
Added:
qpid/proton/trunk/tests/proton_tests/messenger.py
Modified:
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/include/proton/error.h
qpid/proton/trunk/proton-c/include/proton/messenger.h
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/driver.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/tests/proton_tests/__init__.py
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1356935&r1=1356934&r2=1356935&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Jul 3 20:50:51 2012
@@ -76,6 +76,7 @@ typedef int pn_trace_t;
#define PN_TRACE_OFF (0)
#define PN_TRACE_RAW (1)
#define PN_TRACE_FRM (2)
+#define PN_TRACE_DRV (4)
#define PN_SESSION_WINDOW (1024)
Modified: qpid/proton/trunk/proton-c/include/proton/error.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/error.h?rev=1356935&r1=1356934&r2=1356935&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/error.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/error.h Tue Jul 3 20:50:51 2012
@@ -32,6 +32,7 @@ typedef struct pn_error_t pn_error_t;
#define PN_UNDERFLOW (-4)
#define PN_STATE_ERR (-5)
#define PN_ARG_ERR (-6)
+#define PN_TIMEOUT (-7)
const char *pn_code(int code);
Modified: qpid/proton/trunk/proton-c/include/proton/messenger.h
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/messenger.h?rev=1356935&r1=1356934&r2=1356935&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/messenger.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/messenger.h Tue Jul 3 20:50:51 2012
@@ -48,6 +48,24 @@ pn_messenger_t *pn_messenger(const char
*/
const char *pn_messenger_name(pn_messenger_t *messenger);
+/** Sets the timeout for a Messenger. A negative timeout means
+ * infinite.
+ *
+ * @param[in] messenger the messenger
+ * @param[timeout] the new timeout for the messenger, in milliseconds
+ *
+ * @return an error code or zero if there is no error
+ */
+int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout);
+
+/** Retrieves the timeout for a Messenger.
+ *
+ * @param[in] messenger the messenger
+ *
+ * @return the timeout for the messenger, in milliseconds
+ */
+int pn_messenger_get_timeout(pn_messenger_t *messenger);
+
/** Frees a Messenger.
*
* @param[in] messenger the messenger to free, no longer valid on
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1356935&r1=1356934&r2=1356935&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Tue Jul 3 20:50:51 2012
@@ -82,8 +82,8 @@ static void pn_do_trace(pn_dispatcher_t
uint8_t code = scanned ? code64 : 0;
size_t n = SCRATCH;
pn_data_format(args, disp->scratch, &n);
- fprintf(stderr, "[%u] %s %s %s", ch, dir == OUT ? "->" : "<-",
- disp->names[code], disp->scratch);
+ fprintf(stderr, "[%p:%u] %s %s %s", (void *) disp, ch,
+ dir == OUT ? "->" : "<-", disp->names[code], disp->scratch);
if (size) {
size_t capacity = 4*size + 1;
char buf[capacity];
Modified: qpid/proton/trunk/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/driver.c?rev=1356935&r1=1356934&r2=1356935&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/driver.c (original)
+++ qpid/proton/trunk/proton-c/src/driver.c Tue Jul 3 20:50:51 2012
@@ -68,11 +68,13 @@ struct pn_listener_t {
};
#define IO_BUF_SIZE (4*1024)
+#define NAME_MAX (256)
struct pn_connector_t {
pn_driver_t *driver;
pn_connector_t *connector_next;
pn_connector_t *connector_prev;
+ char name[256];
int idx;
bool pending_tick;
bool pending_read;
@@ -165,7 +167,8 @@ pn_listener_t *pn_listener(pn_driver_t *
pn_listener_t *l = pn_listener_fd(driver, sock, context);
- printf("Listening on %s:%s\n", host, port);
+ if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+ printf("Listening on %s:%s\n", host, port);
return l;
}
@@ -226,9 +229,10 @@ pn_connector_t *pn_listener_accept(pn_li
return NULL;
} else {
pn_configure_sock(sock);
- if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
- printf("accepted from %s:%s\n", host, serv);
+ if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+ fprintf(stderr, "Accepted from %s:%s\n", host, serv);
pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
+ snprintf(c->name, NAME_MAX, "%s:%s", host, serv);
c->listener = l;
return c;
}
@@ -303,7 +307,9 @@ pn_connector_t *pn_connector(pn_driver_t
freeaddrinfo(addr);
pn_connector_t *c = pn_connector_fd(driver, sock, context);
- printf("Connected to %s:%s\n", host, port);
+ snprintf(c->name, NAME_MAX, "%s:%s", host, port);
+ if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
+ fprintf(stderr, "Connected to %s\n", c->name);
return c;
}
@@ -332,6 +338,7 @@ pn_connector_t *pn_connector_fd(pn_drive
c->pending_tick = false;
c->pending_read = false;
c->pending_write = false;
+ c->name[0] = '\0';
c->idx = 0;
c->fd = fd;
c->status = PN_SEL_RD | PN_SEL_WR;
@@ -460,7 +467,7 @@ static void pn_connector_process_input(p
if (n == PN_EOS) {
pn_connector_consume(ctor, ctor->input_size);
} else {
- printf("error in process_input: %s\n", pn_code(n));
+ fprintf(stderr, "error in process_input: %s\n", pn_code(n));
}
ctor->input_done = true;
break;
@@ -658,8 +665,9 @@ void pn_connector_process(pn_connector_t
c->pending_write = false;
}
if (c->output_size == 0 && c->input_done && c->output_done) {
- if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW))
- fprintf(stderr, "closed\n");
+ if (c->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV)) {
+ fprintf(stderr, "Closed %s\n", c->name);
+ }
pn_connector_close(c);
}
}
@@ -686,7 +694,8 @@ pn_driver_t *pn_driver()
d->ctrl[0] = 0;
d->ctrl[1] = 0;
d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
- (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF));
+ (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
+ (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF));
// XXX
if (pipe(d->ctrl)) {
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1356935&r1=1356934&r2=1356935&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Tue Jul 3 20:50:51 2012
@@ -21,6 +21,7 @@
#include <proton/messenger.h>
#include <proton/driver.h>
+#include <proton/util.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
@@ -29,6 +30,7 @@
struct pn_messenger_t {
char *name;
+ int timeout;
pn_driver_t *driver;
pn_connector_t *connectors[1024];
size_t size;
@@ -57,6 +59,7 @@ pn_messenger_t *pn_messenger(const char
if (m) {
m->name = build_name(name);
+ m->timeout = -1;
m->driver = pn_driver();
m->size = 0;
m->listeners = 0;
@@ -73,6 +76,18 @@ const char *pn_messenger_name(pn_messeng
return messenger->name;
}
+int pn_messenger_set_timeout(pn_messenger_t *messenger, int timeout)
+{
+ if (!messenger) return PN_ARG_ERR;
+ messenger->timeout = timeout;
+ return 0;
+}
+
+int pn_messenger_get_timeout(pn_messenger_t *messenger)
+{
+ return messenger ? messenger->timeout : 0;
+}
+
void pn_messenger_free(pn_messenger_t *messenger)
{
if (messenger) {
@@ -171,14 +186,28 @@ void pn_messenger_reclaim(pn_messenger_t
}
}
-int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
+long int millis(struct timeval tv)
+{
+ return tv.tv_sec * 1000 + tv.tv_usec/1000;
+}
+
+int pn_messenger_tsync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *), int timeout)
{
for (int i = 0; i < messenger->size; i++) {
pn_connector_process(messenger->connectors[i]);
}
- while (!predicate(messenger)) {
- pn_driver_wait(messenger->driver, -1);
+ struct timeval now;
+ if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+ long int deadline = millis(now) + timeout;
+ bool pred;
+
+ while (true) {
+ pred = predicate(messenger);
+ int remaining = deadline - millis(now);
+ if (pred || (timeout >= 0 && remaining < 0)) break;
+
+ pn_driver_wait(messenger->driver, remaining);
pn_listener_t *l;
while ((l = pn_driver_listener(messenger->driver))) {
@@ -214,47 +243,30 @@ int pn_messenger_sync(pn_messenger_t *me
pn_connector_process(c);
}
}
+
+ if (timeout >= 0) {
+ if (gettimeofday(&now, NULL)) pn_fatal("gettimeofday failed\n");
+ }
}
- return 0;
+ return pred ? 0 : PN_TIMEOUT;
}
-bool pn_messenger_linked(pn_messenger_t *messenger)
+int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_t *))
{
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
- pn_connection_t *conn = pn_connector_connection(ctor);
- pn_state_t state = pn_connection_state(conn);
- if ((state == (PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT)) ||
- (state == (PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE))) {
- return false;
- }
-
- if (pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT) ||
- pn_link_head(conn, PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE)) {
- return false;
- }
- }
-
- return true;
+ return pn_messenger_tsync(messenger, predicate, messenger->timeout);
}
int pn_messenger_start(pn_messenger_t *messenger)
{
if (!messenger) return PN_ARG_ERR;
- return pn_messenger_sync(messenger, pn_messenger_linked);
+ // right now this is a noop
+ return 0;
}
-bool pn_messenger_unlinked(pn_messenger_t *messenger)
+bool pn_messenger_stopped(pn_messenger_t *messenger)
{
- for (int i = 0; i < messenger->size; i++) {
- pn_connector_t *ctor = messenger->connectors[i];
- pn_connection_t *conn = pn_connector_connection(ctor);
- pn_state_t state = pn_connection_state(conn);
- if (state != (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED))
- return false;
- }
- return true;
+ return messenger->size == 0;
}
int pn_messenger_stop(pn_messenger_t *messenger)
@@ -272,7 +284,7 @@ int pn_messenger_stop(pn_messenger_t *me
pn_connection_close(conn);
}
- return pn_messenger_sync(messenger, pn_messenger_unlinked);
+ return pn_messenger_sync(messenger, pn_messenger_stopped);
}
static void parse_address(char *address, char **domain, char **name)
@@ -301,6 +313,18 @@ bool pn_streq(const char *a, const char
pn_connection_t *pn_messenger_domain(pn_messenger_t *messenger, const char *domain)
{
+ char buf[strlen(domain) + 1];
+ if (domain) {
+ strcpy(buf, domain);
+ } else {
+ buf[0] = '\0';
+ }
+ char *user = NULL;
+ char *pass = NULL;
+ char *host = "0.0.0.0";
+ char *port = "5672";
+ parse_url(buf, &user, &pass, &host, &port);
+
for (int i = 0; i < messenger->size; i++) {
pn_connection_t *connection = pn_connector_connection(messenger->connectors[i]);
const char *container = pn_connection_remote_container(connection);
@@ -309,12 +333,16 @@ pn_connection_t *pn_messenger_domain(pn_
return connection;
}
- pn_connector_t *connector = pn_connector(messenger->driver, domain, "5672", NULL);
+ pn_connector_t *connector = pn_connector(messenger->driver, host, port, NULL);
if (!connector) return NULL;
messenger->connectors[messenger->size++] = connector;
pn_sasl_t *sasl = pn_connector_sasl(connector);
- pn_sasl_mechanisms(sasl, "ANONYMOUS");
- pn_sasl_client(sasl);
+ if (user) {
+ pn_sasl_plain(sasl, user, pass);
+ } else {
+ pn_sasl_mechanisms(sasl, "ANONYMOUS");
+ pn_sasl_client(sasl);
+ }
pn_connection_t *connection = pn_connection();
pn_connection_set_container(connection, messenger->name);
pn_connection_set_hostname(connection, domain);
@@ -378,11 +406,15 @@ pn_listener_t *pn_messenger_isource(pn_m
{
char buf[strlen(source) + 1];
strcpy(buf, source);
- char *domain;
- char *name;
+ char *domain, *name;
parse_address(buf, &domain, &name);
+ char *user = NULL;
+ char *pass = NULL;
+ char *host = "0.0.0.0";
+ char *port = "5672";
+ parse_url(domain + 1, &user, &pass, &host, &port);
- pn_listener_t *listener = pn_listener(messenger->driver, domain + 1, "5672", NULL);
+ pn_listener_t *listener = pn_listener(messenger->driver, host, port, NULL);
if (listener) {
messenger->listeners++;
}
@@ -428,6 +460,8 @@ static void outward_munge(pn_messenger_t
}
}
+bool false_pred(pn_messenger_t *messenger) { return false; }
+
int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg)
{
if (!messenger) return PN_ARG_ERR;
@@ -459,6 +493,7 @@ int pn_messenger_put(pn_messenger_t *mes
return n;
} else {
pn_advance(sender);
+ pn_messenger_tsync(messenger, false_pred, 0);
return 0;
}
}
@@ -469,8 +504,6 @@ int pn_messenger_put(pn_messenger_t *mes
bool pn_messenger_sent(pn_messenger_t *messenger)
{
- // if (!pn_messenger_linked(messenger)) return false;
-
for (int i = 0; i < messenger->size; i++) {
pn_connector_t *ctor = messenger->connectors[i];
pn_connection_t *conn = pn_connector_connection(ctor);
@@ -497,8 +530,6 @@ bool pn_messenger_sent(pn_messenger_t *m
bool pn_messenger_rcvd(pn_messenger_t *messenger)
{
- // if (!pn_messenger_linked(messenger)) return false;
-
for (int i = 0; i < messenger->size; i++) {
pn_connector_t *ctor = messenger->connectors[i];
pn_connection_t *conn = pn_connector_connection(ctor);
@@ -532,6 +563,8 @@ int pn_messenger_recv(pn_messenger_t *me
int pn_messenger_get(pn_messenger_t *messenger, pn_message_t *msg)
{
+ if (!messenger) return PN_ARG_ERR;
+
for (int i = 0; i < messenger->size; i++) {
pn_connector_t *ctor = messenger->connectors[i];
pn_connection_t *conn = pn_connector_connection(ctor);
@@ -545,10 +578,14 @@ int pn_messenger_get(pn_messenger_t *mes
ssize_t n = pn_recv(l, buf, 1024);
pn_settle(d);
if (n < 0) return n;
- int err = pn_message_decode(msg, buf, n);
- if (err) {
- return pn_error_format(messenger->error, err, "error decoding message: %s",
+ if (msg) {
+ int err = pn_message_decode(msg, buf, n);
+ if (err) {
+ return pn_error_format(messenger->error, err, "error decoding message: %s",
pn_message_error(msg));
+ } else {
+ return 0;
+ }
} else {
return 0;
}
@@ -564,6 +601,8 @@ int pn_messenger_get(pn_messenger_t *mes
int pn_messenger_queued(pn_messenger_t *messenger, bool sender)
{
+ if (!messenger) return 0;
+
int result = 0;
for (int i = 0; i < messenger->size; i++) {
Modified: qpid/proton/trunk/tests/proton_tests/__init__.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/__init__.py?rev=1356935&r1=1356934&r2=1356935&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/__init__.py (original)
+++ qpid/proton/trunk/tests/proton_tests/__init__.py Tue Jul 3 20:50:51 2012
@@ -19,3 +19,4 @@
import proton_tests.engine
import proton_tests.message
+import proton_tests.messenger
Added: qpid/proton/trunk/tests/proton_tests/messenger.py
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/messenger.py?rev=1356935&view=auto
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/messenger.py (added)
+++ qpid/proton/trunk/tests/proton_tests/messenger.py Tue Jul 3 20:50:51 2012
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, common, xproton
+from xproton import *
+from threading import Thread
+
+class Test(common.Test):
+
+ def setup(self):
+ self.server = pn_messenger("server")
+ pn_messenger_set_timeout(self.server, 10000)
+ pn_messenger_start(self.server)
+ pn_messenger_subscribe(self.server, "//~0.0.0.0:12345")
+ self.thread = Thread(target=self.run)
+ self.running = True
+ self.thread.start()
+
+ self.client = pn_messenger("client")
+ pn_messenger_set_timeout(self.client, 10000)
+ pn_messenger_start(self.client)
+
+ def teardown(self):
+ self.running = False
+ msg = pn_message()
+ pn_message_set_address(msg, "//0.0.0.0:12345")
+ pn_messenger_put(self.client, msg)
+ pn_messenger_send(self.client)
+ pn_messenger_stop(self.client)
+ self.thread.join()
+ pn_messenger_free(self.client)
+ pn_messenger_free(self.server)
+ self.client = None
+ self.server = None
+
+class MessengerTest(Test):
+
+ def run(self):
+ msg = pn_message()
+ while self.running:
+ pn_messenger_recv(self.server, 10)
+ while pn_messenger_incoming(self.server):
+ if pn_messenger_get(self.server, msg):
+ print pn_messenger_error(self.server)
+ else:
+ reply_to = pn_message_get_reply_to(msg)
+ if reply_to:
+ pn_message_set_address(msg, reply_to)
+ pn_messenger_put(self.server, msg)
+ pn_messenger_stop(self.server)
+
+ def testSendReceive(self):
+ msg = pn_message()
+ pn_message_set_address(msg, "//0.0.0.0:12345")
+ pn_message_set_subject(msg, "Hello World!")
+ body = "First the world, then the galaxy!"
+ pn_message_load(msg, body)
+ pn_messenger_put(self.client, msg)
+ pn_messenger_send(self.client)
+
+ reply = pn_message()
+ assert not pn_messenger_recv(self.client, 1)
+ assert pn_messenger_incoming(self.client) == 1
+ assert not pn_messenger_get(self.client, reply)
+
+ assert pn_message_get_subject(reply) == "Hello World!"
+ cd, rbod = pn_message_save(reply, 1024)
+ assert not cd
+ assert rbod == body
+
+ pn_message_free(msg)
+ pn_message_free(reply)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org