You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/04/10 17:52:57 UTC
[1/2] qpid-proton git commit: PROTON-1823: [c] pn_message_send()
calls pn_link_advance()
Repository: qpid-proton
Updated Branches:
refs/heads/master e91457c7e -> e828055b2
PROTON-1823: [c] pn_message_send() calls pn_link_advance()
Call pn_link_advance() at the end of pn_message_send()
Since pn_message_send() can only send complete messages, it makes no sense to
call it without calling pn_link_advance().
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c7717a47
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c7717a47
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c7717a47
Branch: refs/heads/master
Commit: c7717a47d675bae12fcfbf31244009ddb247787c
Parents: e91457c
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Apr 10 11:54:15 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Apr 10 13:28:09 2018 -0400
----------------------------------------------------------------------
c/examples/direct.c | 1 -
c/examples/send.c | 1 -
c/include/proton/message.h | 6 ++++++
c/src/core/message.c | 7 +++----
c/tests/connection_driver.c | 1 -
5 files changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c7717a47/c/examples/direct.c
----------------------------------------------------------------------
diff --git a/c/examples/direct.c b/c/examples/direct.c
index d2a4ae3..fd0c3a4 100644
--- a/c/examples/direct.c
+++ b/c/examples/direct.c
@@ -181,7 +181,6 @@ static void handle_send(app_data_t* app, pn_event_t* event) {
/* Use sent counter as unique delivery tag. */
pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
send_message(app, sender);
- pn_link_advance(sender);
}
break;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c7717a47/c/examples/send.c
----------------------------------------------------------------------
diff --git a/c/examples/send.c b/c/examples/send.c
index 8d979e6..265b66c 100644
--- a/c/examples/send.c
+++ b/c/examples/send.c
@@ -99,7 +99,6 @@ static bool handle(app_data_t* app, pn_event_t* event) {
/* Use sent counter as unique delivery tag. */
pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent)));
send_message(app, sender);
- pn_link_advance(sender);
}
break;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c7717a47/c/include/proton/message.h
----------------------------------------------------------------------
diff --git a/c/include/proton/message.h b/c/include/proton/message.h
index fd69688..0f094be 100644
--- a/c/include/proton/message.h
+++ b/c/include/proton/message.h
@@ -742,6 +742,12 @@ struct pn_link_t;
/**
* Encode and send a message on a sender link.
*
+ * Performs the following steps:
+ * - create or expand the buffer @buf as required
+ * - call pn_message_encode() to encode the message to a buffer
+ * - call pn_link_send() to send the encoded message bytes
+ * - call pn_link_advance() to indicate the message is complete
+ *
* @param[in] msg A message object.
* @param[in] sender A sending link.
* The message will be encoded and sent with pn_link_send()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c7717a47/c/src/core/message.c
----------------------------------------------------------------------
diff --git a/c/src/core/message.c b/c/src/core/message.c
index 3312f8c..dbbdacf 100644
--- a/c/src/core/message.c
+++ b/c/src/core/message.c
@@ -916,10 +916,9 @@ PN_EXTERN ssize_t pn_message_send(pn_message_t *msg, pn_link_t *sender, pn_rwbyt
if (buffer->start == NULL) return PN_OUT_OF_MEMORY;
size = buffer->size;
}
- if (err == 0) {
- err = pn_link_send(sender, buffer->start, size);
- if (err < 0) pn_error_copy(pn_message_error(msg), pn_link_error(sender));
- }
+ if (err >= 0) err = pn_link_send(sender, buffer->start, size);
+ if (err >= 0) err = pn_link_advance(sender);
+ if (err < 0) pn_error_copy(pn_message_error(msg), pn_link_error(sender));
free(local_buf.start);
return err;
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c7717a47/c/tests/connection_driver.c
----------------------------------------------------------------------
diff --git a/c/tests/connection_driver.c b/c/tests/connection_driver.c
index 7e4489a..62655a7 100644
--- a/c/tests/connection_driver.c
+++ b/c/tests/connection_driver.c
@@ -106,7 +106,6 @@ static void test_message_transfer(test_t *t) {
pn_message_send(m, snd, NULL);
pn_message_free(m);
- TEST_CHECK(t, pn_link_advance(snd));
test_connection_drivers_run(&client, &server);
TEST_HANDLER_EXPECT(&server.handler, PN_TRANSPORT, PN_DELIVERY, 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-proton git commit: PROTON-1809: PROTON-636: Unable to
receive messages when max-frame-size > 2^20
Posted by ac...@apache.org.
PROTON-1809: PROTON-636: Unable to receive messages when max-frame-size > 2^20
Caused when the frame size was greater than the default session-capacity so the
incoming windows is always 0.
Fixes:
1. No default session-capacity. Session flow control is enabled only if both
session-capacity and max-frame-size are set. Neither value is deduced
automatically.
2. Transport error if both are set and session-capacity is less than
max-frame-size. In this case the incoming window is always 0 so
communication is impossible.
3. Update API doc for pn_session_set_capacity
4. Add tests to verify this behavior
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e828055b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e828055b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e828055b
Branch: refs/heads/master
Commit: e828055b2a0fade8835cd597e91d0ae3c8bb3f5a
Parents: c7717a4
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Apr 10 10:31:56 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Apr 10 13:28:24 2018 -0400
----------------------------------------------------------------------
c/examples/receive.c | 2 +-
c/include/proton/message.h | 3 ++
c/include/proton/session.h | 11 ++++---
c/src/core/engine.c | 5 ++--
c/src/core/framing.h | 1 +
c/src/core/transport.c | 20 +++++++++----
c/tests/connection_driver.c | 64 +++++++++++++++++++++++++++++++++++++++-
c/tests/test_handler.h | 12 ++++++++
c/tests/test_tools.h | 8 ++++-
9 files changed, 110 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/examples/receive.c
----------------------------------------------------------------------
diff --git a/c/examples/receive.c b/c/examples/receive.c
index 0d0c988..8280345 100644
--- a/c/examples/receive.c
+++ b/c/examples/receive.c
@@ -93,7 +93,7 @@ static bool handle(app_data_t* app, pn_event_t* event) {
} break;
case PN_DELIVERY: {
- /* A message has been received */
+ /* A message (or part of a message) has been received */
pn_delivery_t *d = pn_event_delivery(event);
if (pn_delivery_readable(d)) {
pn_link_t *l = pn_delivery_link(d);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/include/proton/message.h
----------------------------------------------------------------------
diff --git a/c/include/proton/message.h b/c/include/proton/message.h
index 0f094be..d7b9663 100644
--- a/c/include/proton/message.h
+++ b/c/include/proton/message.h
@@ -748,6 +748,9 @@ struct pn_link_t;
* - call pn_link_send() to send the encoded message bytes
* - call pn_link_advance() to indicate the message is complete
*
+ * Note: you must create a delivery for the message before calling
+ * pn_message_send() see pn_delivery()
+ *
* @param[in] msg A message object.
* @param[in] sender A sending link.
* The message will be encoded and sent with pn_link_send()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/include/proton/session.h
----------------------------------------------------------------------
diff --git a/c/include/proton/session.h b/c/include/proton/session.h
index cabb1f2..512e004 100644
--- a/c/include/proton/session.h
+++ b/c/include/proton/session.h
@@ -198,9 +198,7 @@ PN_EXTERN void pn_session_close(pn_session_t *session);
* Get the incoming capacity of the session measured in bytes.
*
* The incoming capacity of a session determines how much incoming
- * message data the session will buffer. Note that if this value is
- * less than the negotiated frame size of the transport, it will be
- * rounded up to one full frame.
+ * message data the session will buffer.
*
* @param[in] session the session object
* @return the incoming capacity of the session in bytes
@@ -211,9 +209,10 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);
* Set the incoming capacity for a session object.
*
* The incoming capacity of a session determines how much incoming
- * message data the session will buffer. Note that if this value is
- * less than the negotiated frame size of the transport, it will be
- * rounded up to one full frame.
+ * message data the session will buffer.
+ *
+ * NOTE: If set, this value must be greater than or equal to the negotiated
+ * frame size of the transport.
*
* @param[in] session the session object
* @param[in] capacity the incoming capacity for the session
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/engine.c
----------------------------------------------------------------------
diff --git a/c/src/core/engine.c b/c/src/core/engine.c
index f49886d..070c751 100644
--- a/c/src/core/engine.c
+++ b/c/src/core/engine.c
@@ -20,6 +20,7 @@
*/
#include "engine-internal.h"
+#include "framing.h"
#include <stdlib.h>
#include <string.h>
#include "protocol.h"
@@ -987,12 +988,12 @@ pn_session_t *pn_session(pn_connection_t *conn)
ssn->links = pn_list(PN_WEAKREF, 0);
ssn->freed = pn_list(PN_WEAKREF, 0);
ssn->context = pn_record();
- ssn->incoming_capacity = 1024*1024;
+ ssn->incoming_capacity = 0;
ssn->incoming_bytes = 0;
ssn->outgoing_bytes = 0;
ssn->incoming_deliveries = 0;
ssn->outgoing_deliveries = 0;
- ssn->outgoing_window = 2147483647;
+ ssn->outgoing_window = AMQP_MAX_WINDOW_SIZE;
// begin transport state
memset(&ssn->state, 0, sizeof(ssn->state));
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/framing.h
----------------------------------------------------------------------
diff --git a/c/src/core/framing.h b/c/src/core/framing.h
index 792d664..92c1f7d 100644
--- a/c/src/core/framing.h
+++ b/c/src/core/framing.h
@@ -30,6 +30,7 @@
#define AMQP_HEADER_SIZE (8)
#define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame
+#define AMQP_MAX_WINDOW_SIZE (2147483647)
typedef struct {
uint8_t type;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/src/core/transport.c
----------------------------------------------------------------------
diff --git a/c/src/core/transport.c b/c/src/core/transport.c
index 96b54f2..1a05261 100644
--- a/c/src/core/transport.c
+++ b/c/src/core/transport.c
@@ -1893,11 +1893,21 @@ static size_t pni_session_outgoing_window(pn_session_t *ssn)
static size_t pni_session_incoming_window(pn_session_t *ssn)
{
- uint32_t size = ssn->connection->transport->local_max_frame;
- if (!size) {
- return 2147483647; // biggest legal value
- } else {
- return (ssn->incoming_capacity - ssn->incoming_bytes)/size;
+ pn_transport_t *t = ssn->connection->transport;
+ uint32_t size = t->local_max_frame;
+ size_t capacity = ssn->incoming_capacity;
+ if (!size || !capacity) { /* session flow control is not enabled */
+ return AMQP_MAX_WINDOW_SIZE;
+ } else if (capacity >= size) { /* precondition */
+ return (capacity - ssn->incoming_bytes) / size;
+ } else { /* error: we will never have a non-zero window */
+ pn_condition_format(
+ pn_transport_condition(t),
+ "amqp:internal-error",
+ "session capacity %"PN_ZU" is less than frame size %"PN_ZU,
+ capacity, size);
+ pn_transport_close_tail(t);
+ return 0;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/connection_driver.c
----------------------------------------------------------------------
diff --git a/c/tests/connection_driver.c b/c/tests/connection_driver.c
index 62655a7..5997f52 100644
--- a/c/tests/connection_driver.c
+++ b/c/tests/connection_driver.c
@@ -379,6 +379,68 @@ static void test_message_abort_mixed(test_t *t) {
test_connection_driver_destroy(&server);
}
+/* Set capacity and max frame, send a single message */
+static void set_capacity_and_max_frame(
+ size_t capacity, size_t max_frame,
+ test_connection_driver_t *client, test_connection_driver_t *server,
+ const char* data)
+{
+ pn_transport_set_max_frame(client->driver.transport, max_frame);
+ pn_connection_open(client->driver.connection);
+ pn_session_t *ssn = pn_session(client->driver.connection);
+ pn_session_set_incoming_capacity(ssn, capacity);
+ pn_session_open(ssn);
+ pn_link_t *snd = pn_sender(ssn, "x");
+ pn_link_open(snd);
+ test_connection_drivers_run(client, server);
+ pn_link_flow(server->handler.link, 1);
+ test_connection_drivers_run(client, server);
+ if (pn_transport_closed(client->driver.transport) ||
+ pn_transport_closed(server->driver.transport))
+ return;
+ /* Send a message */
+ pn_message_t *m = pn_message();
+ pn_message_set_address(m, data);
+ pn_delivery(snd, PN_BYTES_LITERAL(x));
+ pn_message_send(m, snd, NULL);
+ pn_message_free(m);
+ test_connection_drivers_run(client, server);
+}
+
+#define MAX_WINDOW (2147483647)
+#define MAX_FRAME (4294967295)
+/* Test different settings for max-frame, outgoing-window, incoming-capacity */
+static void test_session_flow_control(test_t *t) {
+ test_connection_driver_t client, server;
+ test_connection_drivers_init(t, &client, open_handler, &server, delivery_handler);
+ pn_message_t *m = pn_message();
+ pn_rwbytes_t buf= {0};
+
+ /* Capacity equal to frame size OK */
+ set_capacity_and_max_frame(1234, 1234, &client, &server, "foo");
+ pn_delivery_t *dlv = server.handler.delivery;
+ if (TEST_CHECK(t, dlv)) {
+ message_decode(m, dlv, &buf);
+ TEST_STR_EQUAL(t, "foo", pn_message_get_address(m));
+ }
+
+ /* Capacity bigger than frame size OK */
+ set_capacity_and_max_frame(12345, 1234, &client, &server, "foo");
+ dlv = server.handler.delivery;
+ if (TEST_CHECK(t, dlv)) {
+ message_decode(m, dlv, &buf);
+ TEST_STR_EQUAL(t, "foo", pn_message_get_address(m));
+ }
+
+ /* Capacity smaller than frame size is an error */
+ set_capacity_and_max_frame(1234, 12345, &client, &server, "foo");
+ TEST_COND_NAME(t, "amqp:internal-error", pn_transport_condition(client.driver.transport));
+ TEST_COND_DESC(t, "session capacity 1234 is less than frame size 12345", pn_transport_condition(client.driver.transport));
+
+ pn_message_free(m);
+ free(buf.start);
+ test_connection_drivers_destroy(&client, &server);
+}
int main(int argc, char **argv) {
int failed = 0;
@@ -386,6 +448,6 @@ int main(int argc, char **argv) {
RUN_ARGV_TEST(failed, t, test_message_stream(&t));
RUN_ARGV_TEST(failed, t, test_message_abort(&t));
RUN_ARGV_TEST(failed, t, test_message_abort_mixed(&t));
+ RUN_ARGV_TEST(failed, t, test_session_flow_control(&t));
return failed;
-
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/test_handler.h
----------------------------------------------------------------------
diff --git a/c/tests/test_handler.h b/c/tests/test_handler.h
index 108d0d9..7ffd4d3 100644
--- a/c/tests/test_handler.h
+++ b/c/tests/test_handler.h
@@ -46,6 +46,7 @@ typedef struct test_handler_t {
pn_link_t *sender;
pn_link_t *receiver;
pn_delivery_t *delivery;
+ pn_message_t *message;
pn_ssl_domain_t *ssl_domain;
} test_handler_t;
@@ -167,4 +168,15 @@ test_connection_driver_t* test_connection_drivers_run(test_connection_driver_t *
return NULL;
}
+/* Initialize a client-server driver pair */
+void test_connection_drivers_init(test_t *t, test_connection_driver_t *a, test_handler_fn fa, test_connection_driver_t *b, test_handler_fn fb) {
+ test_connection_driver_init(a, t, fa, NULL);
+ test_connection_driver_init(b, t, fb, NULL);
+ pn_transport_set_server(b->driver.transport);
+}
+
+void test_connection_drivers_destroy(test_connection_driver_t *a, test_connection_driver_t *b) {
+ test_connection_driver_destroy(a);
+ test_connection_driver_destroy(b);
+}
#endif // TESTS_TEST_DRIVER_H
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e828055b/c/tests/test_tools.h
----------------------------------------------------------------------
diff --git a/c/tests/test_tools.h b/c/tests/test_tools.h
index d046a43..7596d60 100644
--- a/c/tests/test_tools.h
+++ b/c/tests/test_tools.h
@@ -148,6 +148,11 @@ bool test_int_equal_(test_t *t, int want, int got, const char *file, int line) {
}
#define TEST_INT_EQUAL(TEST, WANT, GOT) test_int_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)
+bool test_size_equal_(test_t *t, size_t want, size_t got, const char *file, int line) {
+ return test_check_(t, want == got, NULL, file, line, "want %zd, got %zd", want, got);
+}
+#define TEST_SIZE_EQUAL(TEST, WANT, GOT) test_size_equal_((TEST), (WANT), (GOT), __FILE__, __LINE__)
+
bool test_str_equal_(test_t *t, const char* want, const char* got, const char *file, int line) {
return test_check_(t, !strcmp(want, got), NULL, file, line, "want '%s', got '%s'", want, got);
}
@@ -231,7 +236,8 @@ void message_decode(pn_message_t *m, pn_delivery_t *d, pn_rwbytes_t *buf) {
pn_link_t *l = pn_delivery_link(d);
ssize_t size = pn_delivery_pending(d);
rwbytes_ensure(buf, size);
- TEST_ASSERT(size == pn_link_recv(l, buf->start, size));
+ ssize_t result = pn_link_recv(l, buf->start, size);
+ TEST_ASSERTF(size == result, "%ld != %ld", (long)size, (long)result);
pn_message_clear(m);
TEST_ASSERTF(!pn_message_decode(m, buf->start, size), "decode: %s", pn_error_text(pn_message_error(m)));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org