You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2015/06/18 22:15:47 UTC
[42/50] [abbrv] qpid-proton git commit: PROTON-842: enforce
channel_max limit on session creation
PROTON-842: enforce channel_max limit on session creation
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e38957ae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e38957ae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e38957ae
Branch: refs/heads/kgiusti-python3
Commit: e38957ae5115ec023993672ca5b7d5e3df414f7e
Parents: 175a15a
Author: Mick Goulish <mi...@redhat.com>
Authored: Thu Jun 18 08:45:47 2015 -0400
Committer: Mick Goulish <mi...@redhat.com>
Committed: Thu Jun 18 08:45:47 2015 -0400
----------------------------------------------------------------------
proton-c/bindings/python/proton/__init__.py | 7 +-
proton-c/include/proton/cproton.i | 2 -
proton-c/include/proton/transport.h | 16 +++-
proton-c/src/engine/engine-internal.h | 14 ++-
proton-c/src/engine/engine.c | 16 +++-
proton-c/src/transport/transport.c | 111 ++++++++++++++++++++---
tests/python/proton_tests/engine.py | 44 ++++++++-
7 files changed, 191 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 9432bd8..5860764 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -2484,7 +2484,11 @@ class Connection(Wrapper, Endpoint):
"""
Returns a new session on this connection.
"""
- return Session(pn_session(self._impl))
+ ssn = pn_session(self._impl)
+ if ssn is None:
+ raise(SessionException("Session allocation failed."))
+ else:
+ return Session(ssn)
def session_head(self, mask):
return Session.wrap(pn_session_head(self._impl, mask))
@@ -3987,6 +3991,7 @@ __all__ = [
"SASL",
"Sender",
"Session",
+ "SessionException",
"SSL",
"SSLDomain",
"SSLSessionDetails",
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/include/proton/cproton.i
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index ac2b121..b55211f 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -210,8 +210,6 @@ typedef unsigned long int uintptr_t;
{
require:
connection != NULL;
- ensure:
- pn_session != NULL;
}
%contract pn_transport(pn_connection_t *connection)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/include/proton/transport.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h
index a3ca667..483f5a9 100644
--- a/proton-c/include/proton/transport.h
+++ b/proton-c/include/proton/transport.h
@@ -320,6 +320,10 @@ PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...
/**
* Get the maximum allowed channel for a transport.
+ * This will be the minimum of
+ * 1. limit imposed by this proton implementation
+ * 2. limit imposed by remote peer
+ * 3. limit imposed by this application, using pn_transport_set_channel_max()
*
* @param[in] transport a transport object
* @return the maximum allowed channel
@@ -327,7 +331,17 @@ PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...
PN_EXTERN uint16_t pn_transport_get_channel_max(pn_transport_t *transport);
/**
- * Set the maximum allowed channel for a transport.
+ * Set the maximum allowed channel number for a transport.
+ * Note that this is the maximum channel number allowed, giving a
+ * valid channel number range of [0..channel_max]. Therefore the
+ * maximum number of simultaineously active channels will be
+ * channel_max plus 1.
+ * You can call this function more than once to raise and lower
+ * the limit your application imposes on max channels for this
+ * transport. However, smaller limits may be imposed by this
+ * library, or by the remote peer.
+ * After the OPEN frame has been sent to the remote peer,
+ * further calls to this function will have no effect.
*
* @param[in] transport a transport object
* @param[in] channel_max the maximum allowed channel
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index 4c72310..c03a0a3 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -178,8 +178,20 @@ struct pn_transport_t {
pn_trace_t trace;
- uint16_t channel_max;
+ /*
+ * The maximum channel number can be constrained in several ways:
+ * 1. an unchangeable limit imposed by this library code
+ * 2. a limit imposed by the remote peer when the connection is opened,
+ * which this app must honor
+ * 3. a limit imposed by this app, which may be raised and lowered
+ * until the OPEN frame is sent.
+ * These constraints are all summed up in channel_max, below.
+ */
+ #define PN_IMPL_CHANNEL_MAX 32767
+ uint16_t local_channel_max;
uint16_t remote_channel_max;
+ uint16_t channel_max;
+
bool freed;
bool open_sent;
bool open_rcvd;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index c5228a5..936cf60 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -953,12 +953,26 @@ static void pn_session_finalize(void *object)
pn_session_t *pn_session(pn_connection_t *conn)
{
assert(conn);
+
+
+ pn_transport_t * transport = pn_connection_transport(conn);
+
+ if(transport) {
+ // channel_max is an index, not a count.
+ if(pn_hash_size(transport->local_channels) > (size_t)transport->channel_max) {
+ pn_transport_logf(transport,
+ "pn_session: too many sessions: %d channel_max is %d",
+ pn_hash_size(transport->local_channels),
+ transport->channel_max);
+ return (pn_session_t *) 0;
+ }
+ }
+
#define pn_session_free pn_object_free
static const pn_class_t clazz = PN_METACLASS(pn_session);
#undef pn_session_free
pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t));
if (!ssn) return NULL;
-
pn_endpoint_init(&ssn->endpoint, SESSION, conn);
pni_add_session(conn, ssn);
ssn->links = pn_list(PN_WEAKREF, 0);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 733f695..ff80e21 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -43,6 +43,33 @@ static ssize_t transport_consume(pn_transport_t *transport);
// delivery buffers
+/*
+ * Call this any time anything happens that may affect channel_max:
+ * i.e. when the app indicates a preference, or when we receive the
+ * OPEN frame from the remote peer. And call it to do the final
+ * calculation just before we communicate our limit to the remote
+ * peer by sending our OPEN frame.
+ */
+static void pni_calculate_channel_max(pn_transport_t *transport) {
+ /*
+ * The application cannot make the limit larger than
+ * what this library will allow.
+ */
+ transport->channel_max = (PN_IMPL_CHANNEL_MAX < transport->local_channel_max)
+ ? PN_IMPL_CHANNEL_MAX
+ : transport->local_channel_max;
+
+ /*
+ * The remote peer's constraint is not valid until the
+ * peer's open frame has been received.
+ */
+ if(transport->open_rcvd) {
+ transport->channel_max = (transport->channel_max < transport->remote_channel_max)
+ ? transport->channel_max
+ : transport->remote_channel_max;
+ }
+}
+
void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next)
{
db->deliveries = pn_hash(PN_WEAKREF, 0, 0.75);
@@ -370,7 +397,23 @@ static void pn_transport_initialize(void *object)
transport->remote_hostname = NULL;
transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE;
transport->remote_max_frame = 0;
- transport->channel_max = 0;
+
+ /*
+ * We set the local limit on channels to 2^15, because
+ * parts of the code use the topmost bit (of a short)
+ * as a flag.
+ * The peer that this transport connects to may also
+ * place its own limit on max channel number, and the
+ * application may also set a limit.
+ * The maximum that we use will be the minimum of all
+ * these constraints.
+ */
+ // There is no constraint yet from remote peer,
+ // so set to max possible.
+ transport->remote_channel_max = 65535;
+ transport->local_channel_max = PN_IMPL_CHANNEL_MAX;
+ transport->channel_max = transport->local_channel_max;
+
transport->remote_channel_max = 0;
transport->local_idle_timeout = 0;
transport->dead_remote_deadline = 0;
@@ -1098,6 +1141,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel,
transport->halt = true;
}
transport->open_rcvd = true;
+ pni_calculate_channel_max(transport);
return 0;
}
@@ -1109,6 +1153,18 @@ int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel,
int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next);
if (err) return err;
+ // AMQP 1.0 section 2.7.1 - if the peer doesn't honor our channel_max --
+ // express our displeasure by closing the connection with a framing error.
+ if (remote_channel > transport->channel_max) {
+ pn_do_error(transport,
+ "amqp:connection:framing-error",
+ "remote channel %d is above negotiated channel_max %d.",
+ remote_channel,
+ transport->channel_max
+ );
+ return PN_TRANSPORT_ERROR;
+ }
+
pn_session_t *ssn;
if (reply) {
// XXX: what if session is NULL?
@@ -1774,6 +1830,7 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp
: 0;
pn_connection_t *connection = (pn_connection_t *) endpoint;
const char *cid = pn_string_get(connection->container);
+ pni_calculate_channel_max(transport);
int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[SS?I?H?InnCCC]", OPEN,
cid ? cid : "",
pn_string_get(connection->hostname),
@@ -1792,15 +1849,16 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp
return 0;
}
-static uint16_t allocate_alias(pn_hash_t *aliases)
+static uint16_t allocate_alias(pn_hash_t *aliases, uint32_t max_index, int * valid)
{
- for (uint32_t i = 0; i < 65536; i++) {
+ for (uint32_t i = 0; i <= max_index; i++) {
if (!pn_hash_get(aliases, i)) {
+ * valid = 1;
return i;
}
}
- assert(false);
+ * valid = 0;
return 0;
}
@@ -1828,14 +1886,19 @@ static size_t pni_session_incoming_window(pn_session_t *ssn)
}
}
-static void pni_map_local_channel(pn_session_t *ssn)
+static int pni_map_local_channel(pn_session_t *ssn)
{
pn_transport_t *transport = ssn->connection->transport;
pn_session_state_t *state = &ssn->state;
- uint16_t channel = allocate_alias(transport->local_channels);
+ int valid;
+ uint16_t channel = allocate_alias(transport->local_channels, transport->channel_max, & valid);
+ if (!valid) {
+ return 0;
+ }
state->local_channel = channel;
pn_hash_put(transport->local_channels, channel, ssn);
pn_ep_incref(&ssn->endpoint);
+ return 1;
}
static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
@@ -1846,7 +1909,10 @@ static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpo
pn_session_state_t *state = &ssn->state;
if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
{
- pni_map_local_channel(ssn);
+ if (! pni_map_local_channel(ssn)) {
+ pn_transport_logf(transport, "unable to find an open available channel within limit of %d", transport->channel_max );
+ return PN_ERR;
+ }
state->incoming_window = pni_session_incoming_window(ssn);
state->outgoing_window = pni_session_outgoing_window(ssn);
pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?HIII]", BEGIN,
@@ -1876,12 +1942,17 @@ static const char *expiry_symbol(pn_expiry_policy_t policy)
return NULL;
}
-static void pni_map_local_handle(pn_link_t *link) {
+static int pni_map_local_handle(pn_link_t *link) {
pn_link_state_t *state = &link->state;
pn_session_state_t *ssn_state = &link->session->state;
- state->local_handle = allocate_alias(ssn_state->local_handles);
+ int valid;
+ // XXX TODO MICK: once changes are made to handle_max, change this hardcoded value to something reasonable.
+ state->local_handle = allocate_alias(ssn_state->local_handles, 65536, & valid);
+ if ( ! valid )
+ return 0;
pn_hash_put(ssn_state->local_handles, state->local_handle, link);
pn_ep_incref(&link->endpoint);
+ return 1;
}
static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
@@ -2574,9 +2645,27 @@ uint16_t pn_transport_get_channel_max(pn_transport_t *transport)
return transport->channel_max;
}
-void pn_transport_set_channel_max(pn_transport_t *transport, uint16_t channel_max)
+void pn_transport_set_channel_max(pn_transport_t *transport, uint16_t requested_channel_max)
{
- transport->channel_max = channel_max;
+ /*
+ * Once the OPEN frame has been sent, we have communicated our
+ * wishes to the remote client and there is no way to renegotiate.
+ * After that point, we do not allow the application to make changes.
+ * Before that point, however, the app is free to either raise or
+ * lower our local limit. (But the app cannot raise it above the
+ * limit imposed by this library.)
+ * The channel-max value will be finalized just before the OPEN frame
+ * is sent.
+ */
+ if(transport->open_sent) {
+ pn_transport_logf(transport, "Cannot change local channel-max after OPEN frame sent.");
+ }
+ else {
+ transport->local_channel_max = (requested_channel_max < PN_IMPL_CHANNEL_MAX)
+ ? requested_channel_max
+ : PN_IMPL_CHANNEL_MAX;
+ pni_calculate_channel_max(transport);
+ }
}
uint16_t pn_transport_remote_channel_max(pn_transport_t *transport)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index 924b3bc..1563889 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -211,11 +211,51 @@ class ConnectionTest(Test):
assert self.c2.remote_properties == p1, (self.c2.remote_properties, p1)
assert self.c1.remote_properties == p2, (self.c2.remote_properties, p2)
- def test_channel_max(self, value=1234):
+ # The proton implementation limits channel_max to 32767.
+ # If I set the application's limit lower than that, I should
+ # get my wish. If I set it higher -- not.
+ def test_channel_max_low(self, value=1234):
self.c1.transport.channel_max = value
self.c1.open()
self.pump()
- assert self.c2.transport.remote_channel_max == value, (self.c2.transport.remote_channel_max, value)
+ assert self.c1.transport.channel_max == value, (self.c1.transport.channel_max, value)
+
+ def test_channel_max_high(self, value=33333):
+ self.c1.transport.channel_max = value
+ self.c1.open()
+ self.pump()
+ assert self.c1.transport.channel_max == 32767, (self.c1.transport.channel_max, value)
+
+ def test_channel_max_raise_and_lower(self):
+ # It's OK to lower the max below 32767.
+ self.c1.transport.channel_max = 12345
+ assert self.c1.transport.channel_max == 12345
+ # But it won't let us raise the limit above 32767.
+ self.c1.transport.channel_max = 33333
+ assert self.c1.transport.channel_max == 32767
+ self.c1.open()
+ self.pump()
+ # Now it's too late to make any change, because
+ # we have already sent the OPEN frame.
+ self.c1.transport.channel_max = 666
+ assert self.c1.transport.channel_max == 32767
+
+
+ def test_channel_max_limits_sessions(self):
+ return
+ # This is an index -- so max number of channels should be 1.
+ self.c1.transport.channel_max = 0
+ self.c1.open()
+ self.c2.open()
+ ssn_0 = self.c2.session()
+ assert ssn_0 != None
+ ssn_0.open()
+ self.pump()
+ try:
+ ssn_1 = self.c2.session()
+ assert False, "expected session exception"
+ except SessionException:
+ pass
def test_cleanup(self):
self.c1.open()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org