You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mg...@apache.org on 2015/06/18 14:46:35 UTC

qpid-proton git commit: PROTON-842: enforce channel_max limit on session creation

Repository: qpid-proton
Updated Branches:
  refs/heads/master 175a15a87 -> e38957ae5


PROTON-842: enforce channel_max limit on session creation


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e38957ae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e38957ae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e38957ae

Branch: refs/heads/master
Commit: e38957ae5115ec023993672ca5b7d5e3df414f7e
Parents: 175a15a
Author: Mick Goulish <mi...@redhat.com>
Authored: Thu Jun 18 08:45:47 2015 -0400
Committer: Mick Goulish <mi...@redhat.com>
Committed: Thu Jun 18 08:45:47 2015 -0400

----------------------------------------------------------------------
 proton-c/bindings/python/proton/__init__.py |   7 +-
 proton-c/include/proton/cproton.i           |   2 -
 proton-c/include/proton/transport.h         |  16 +++-
 proton-c/src/engine/engine-internal.h       |  14 ++-
 proton-c/src/engine/engine.c                |  16 +++-
 proton-c/src/transport/transport.c          | 111 ++++++++++++++++++++---
 tests/python/proton_tests/engine.py         |  44 ++++++++-
 7 files changed, 191 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 9432bd8..5860764 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -2484,7 +2484,11 @@ class Connection(Wrapper, Endpoint):
     """
     Returns a new session on this connection.
     """
-    return Session(pn_session(self._impl))
+    ssn = pn_session(self._impl)
+    if ssn is None:
+      raise(SessionException("Session allocation failed."))
+    else:
+      return Session(ssn)
 
   def session_head(self, mask):
     return Session.wrap(pn_session_head(self._impl, mask))
@@ -3987,6 +3991,7 @@ __all__ = [
            "SASL",
            "Sender",
            "Session",
+           "SessionException",
            "SSL",
            "SSLDomain",
            "SSLSessionDetails",

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/include/proton/cproton.i
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i
index ac2b121..b55211f 100644
--- a/proton-c/include/proton/cproton.i
+++ b/proton-c/include/proton/cproton.i
@@ -210,8 +210,6 @@ typedef unsigned long int uintptr_t;
 {
  require:
   connection != NULL;
- ensure:
-  pn_session != NULL;
 }
 
 %contract pn_transport(pn_connection_t *connection)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/include/proton/transport.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h
index a3ca667..483f5a9 100644
--- a/proton-c/include/proton/transport.h
+++ b/proton-c/include/proton/transport.h
@@ -320,6 +320,10 @@ PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...
 
 /**
  * Get the maximum allowed channel for a transport.
+ * This will be the minimum of 
+ *   1. limit imposed by this proton implementation
+ *   2. limit imposed by remote peer
+ *   3. limit imposed by this application, using pn_transport_set_channel_max()
  *
  * @param[in] transport a transport object
  * @return the maximum allowed channel
@@ -327,7 +331,17 @@ PN_EXTERN void pn_transport_logf(pn_transport_t *transport, const char *fmt, ...
 PN_EXTERN uint16_t pn_transport_get_channel_max(pn_transport_t *transport);
 
 /**
- * Set the maximum allowed channel for a transport.
+ * Set the maximum allowed channel number for a transport.
+ * Note that this is the maximum channel number allowed, giving a 
+ * valid channel number range of [0..channel_max]. Therefore the 
+ * maximum number of simultaineously active channels will be 
+ * channel_max plus 1.
+ * You can call this function more than once to raise and lower
+ * the limit your application imposes on max channels for this 
+ * transport.  However, smaller limits may be imposed by this
+ * library, or by the remote peer.
+ * After the OPEN frame has been sent to the remote peer,
+ * further calls to this function will have no effect.
  *
  * @param[in] transport a transport object
  * @param[in] channel_max the maximum allowed channel

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index 4c72310..c03a0a3 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -178,8 +178,20 @@ struct pn_transport_t {
 
   pn_trace_t trace;
 
-  uint16_t channel_max;
+  /*
+   * The maximum channel number can be constrained in several ways:
+   *   1. an unchangeable limit imposed by this library code
+   *   2. a limit imposed by the remote peer when the connection is opened,
+   *      which this app must honor
+   *   3. a limit imposed by this app, which may be raised and lowered
+   *      until the OPEN frame is sent.
+   * These constraints are all summed up in channel_max, below.
+   */
+  #define PN_IMPL_CHANNEL_MAX  32767
+  uint16_t local_channel_max;
   uint16_t remote_channel_max;
+  uint16_t channel_max;
+
   bool freed;
   bool open_sent;
   bool open_rcvd;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index c5228a5..936cf60 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -953,12 +953,26 @@ static void pn_session_finalize(void *object)
 pn_session_t *pn_session(pn_connection_t *conn)
 {
   assert(conn);
+
+
+  pn_transport_t * transport = pn_connection_transport(conn);
+
+  if(transport) {
+    // channel_max is an index, not a count.  
+    if(pn_hash_size(transport->local_channels) > (size_t)transport->channel_max) {
+      pn_transport_logf(transport, 
+                        "pn_session: too many sessions: %d  channel_max is %d",
+                        pn_hash_size(transport->local_channels),
+                        transport->channel_max);
+      return (pn_session_t *) 0;
+    }
+  }
+
 #define pn_session_free pn_object_free
   static const pn_class_t clazz = PN_METACLASS(pn_session);
 #undef pn_session_free
   pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t));
   if (!ssn) return NULL;
-
   pn_endpoint_init(&ssn->endpoint, SESSION, conn);
   pni_add_session(conn, ssn);
   ssn->links = pn_list(PN_WEAKREF, 0);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 733f695..ff80e21 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -43,6 +43,33 @@ static ssize_t transport_consume(pn_transport_t *transport);
 
 // delivery buffers
 
+/*
+ * Call this any time anything happens that may affect channel_max:
+ * i.e. when the app indicates a preference, or when we receive the
+ * OPEN frame from the remote peer.  And call it to do the final 
+ * calculation just before we communicate our limit to the remote 
+ * peer by sending our OPEN frame.
+ */
+static void pni_calculate_channel_max(pn_transport_t *transport) {
+  /*
+   * The application cannot make the limit larger than 
+   * what this library will allow.
+   */
+  transport->channel_max = (PN_IMPL_CHANNEL_MAX < transport->local_channel_max)
+                           ? PN_IMPL_CHANNEL_MAX
+                           : transport->local_channel_max;
+
+  /*
+   * The remote peer's constraint is not valid until the 
+   * peer's open frame has been received.
+   */
+  if(transport->open_rcvd) {
+    transport->channel_max = (transport->channel_max < transport->remote_channel_max)
+                             ? transport->channel_max
+                             : transport->remote_channel_max;
+  }
+}
+
 void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next)
 {
   db->deliveries = pn_hash(PN_WEAKREF, 0, 0.75);
@@ -370,7 +397,23 @@ static void pn_transport_initialize(void *object)
   transport->remote_hostname = NULL;
   transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE;
   transport->remote_max_frame = 0;
-  transport->channel_max = 0;
+
+  /*
+   * We set the local limit on channels to 2^15, because 
+   * parts of the code use the topmost bit (of a short)
+   * as a flag.
+   * The peer that this transport connects to may also 
+   * place its own limit on max channel number, and the
+   * application may also set a limit.
+   * The maximum that we use will be the minimum of all 
+   * these constraints.
+   */
+  // There is no constraint yet from remote peer, 
+  // so set to max possible.
+  transport->remote_channel_max = 65535;  
+  transport->local_channel_max  = PN_IMPL_CHANNEL_MAX;
+  transport->channel_max        = transport->local_channel_max;
+
   transport->remote_channel_max = 0;
   transport->local_idle_timeout = 0;
   transport->dead_remote_deadline = 0;
@@ -1098,6 +1141,7 @@ int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel,
     transport->halt = true;
   }
   transport->open_rcvd = true;
+  pni_calculate_channel_max(transport);
   return 0;
 }
 
@@ -1109,6 +1153,18 @@ int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel,
   int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next);
   if (err) return err;
 
+  // AMQP 1.0 section 2.7.1 - if the peer doesn't honor our channel_max -- 
+  // express our displeasure by closing the connection with a framing error.
+  if (remote_channel > transport->channel_max) {
+    pn_do_error(transport,
+                "amqp:connection:framing-error",
+                "remote channel %d is above negotiated channel_max %d.",
+                remote_channel,
+                transport->channel_max
+               );
+    return PN_TRANSPORT_ERROR;
+  }
+
   pn_session_t *ssn;
   if (reply) {
     // XXX: what if session is NULL?
@@ -1774,6 +1830,7 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp
           : 0;
       pn_connection_t *connection = (pn_connection_t *) endpoint;
       const char *cid = pn_string_get(connection->container);
+      pni_calculate_channel_max(transport);
       int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[SS?I?H?InnCCC]", OPEN,
                               cid ? cid : "",
                               pn_string_get(connection->hostname),
@@ -1792,15 +1849,16 @@ static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endp
   return 0;
 }
 
-static uint16_t allocate_alias(pn_hash_t *aliases)
+static uint16_t allocate_alias(pn_hash_t *aliases, uint32_t max_index, int * valid)
 {
-  for (uint32_t i = 0; i < 65536; i++) {
+  for (uint32_t i = 0; i <= max_index; i++) {
     if (!pn_hash_get(aliases, i)) {
+      * valid = 1;
       return i;
     }
   }
 
-  assert(false);
+  * valid = 0;
   return 0;
 }
 
@@ -1828,14 +1886,19 @@ static size_t pni_session_incoming_window(pn_session_t *ssn)
   }
 }
 
-static void pni_map_local_channel(pn_session_t *ssn)
+static int pni_map_local_channel(pn_session_t *ssn)
 {
   pn_transport_t *transport = ssn->connection->transport;
   pn_session_state_t *state = &ssn->state;
-  uint16_t channel = allocate_alias(transport->local_channels);
+  int valid;
+  uint16_t channel = allocate_alias(transport->local_channels, transport->channel_max, & valid);
+  if (!valid) {
+    return 0;
+  }
   state->local_channel = channel;
   pn_hash_put(transport->local_channels, channel, ssn);
   pn_ep_incref(&ssn->endpoint);
+  return 1;
 }
 
 static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
@@ -1846,7 +1909,10 @@ static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpo
     pn_session_state_t *state = &ssn->state;
     if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1)
     {
-      pni_map_local_channel(ssn);
+      if (! pni_map_local_channel(ssn)) {
+        pn_transport_logf(transport, "unable to find an open available channel within limit of %d", transport->channel_max );
+        return PN_ERR;
+      }
       state->incoming_window = pni_session_incoming_window(ssn);
       state->outgoing_window = pni_session_outgoing_window(ssn);
       pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?HIII]", BEGIN,
@@ -1876,12 +1942,17 @@ static const char *expiry_symbol(pn_expiry_policy_t policy)
   return NULL;
 }
 
-static void pni_map_local_handle(pn_link_t *link) {
+static int pni_map_local_handle(pn_link_t *link) {
   pn_link_state_t *state = &link->state;
   pn_session_state_t *ssn_state = &link->session->state;
-  state->local_handle = allocate_alias(ssn_state->local_handles);
+  int valid;
+  // XXX TODO MICK: once changes are made to handle_max, change this hardcoded value to something reasonable.
+  state->local_handle = allocate_alias(ssn_state->local_handles, 65536, & valid);
+  if ( ! valid )
+    return 0;
   pn_hash_put(ssn_state->local_handles, state->local_handle, link);
   pn_ep_incref(&link->endpoint);
+  return 1;
 }
 
 static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
@@ -2574,9 +2645,27 @@ uint16_t pn_transport_get_channel_max(pn_transport_t *transport)
   return transport->channel_max;
 }
 
-void pn_transport_set_channel_max(pn_transport_t *transport, uint16_t channel_max)
+void pn_transport_set_channel_max(pn_transport_t *transport, uint16_t requested_channel_max)
 {
-  transport->channel_max = channel_max;
+  /*
+   * Once the OPEN frame has been sent, we have communicated our 
+   * wishes to the remote client and there is no way to renegotiate.
+   * After that point, we do not allow the application to make changes.
+   * Before that point, however, the app is free to either raise or 
+   * lower our local limit.  (But the app cannot raise it above the 
+   * limit imposed by this library.)
+   * The channel-max value will be finalized just before the OPEN frame
+   * is sent.
+   */
+  if(transport->open_sent) {
+    pn_transport_logf(transport, "Cannot change local channel-max after OPEN frame sent.");
+  }
+  else {
+    transport->local_channel_max = (requested_channel_max < PN_IMPL_CHANNEL_MAX)
+                                   ? requested_channel_max
+                                   : PN_IMPL_CHANNEL_MAX;
+    pni_calculate_channel_max(transport);
+  }
 }
 
 uint16_t pn_transport_remote_channel_max(pn_transport_t *transport)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e38957ae/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index 924b3bc..1563889 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -211,11 +211,51 @@ class ConnectionTest(Test):
     assert self.c2.remote_properties == p1, (self.c2.remote_properties, p1)
     assert self.c1.remote_properties == p2, (self.c2.remote_properties, p2)
 
-  def test_channel_max(self, value=1234):
+  # The proton implementation limits channel_max to 32767.
+  # If I set the application's limit lower than that, I should 
+  # get my wish.  If I set it higher -- not.
+  def test_channel_max_low(self, value=1234):
     self.c1.transport.channel_max = value
     self.c1.open()
     self.pump()
-    assert self.c2.transport.remote_channel_max == value, (self.c2.transport.remote_channel_max, value)
+    assert self.c1.transport.channel_max == value, (self.c1.transport.channel_max, value)
+
+  def test_channel_max_high(self, value=33333):
+    self.c1.transport.channel_max = value
+    self.c1.open()
+    self.pump()
+    assert self.c1.transport.channel_max == 32767, (self.c1.transport.channel_max, value)
+
+  def test_channel_max_raise_and_lower(self):
+    # It's OK to lower the max below 32767.
+    self.c1.transport.channel_max = 12345
+    assert self.c1.transport.channel_max == 12345
+    # But it won't let us raise the limit above 32767.
+    self.c1.transport.channel_max = 33333
+    assert self.c1.transport.channel_max == 32767
+    self.c1.open()
+    self.pump()
+    # Now it's too late to make any change, because
+    # we have already sent the OPEN frame.
+    self.c1.transport.channel_max = 666
+    assert self.c1.transport.channel_max == 32767
+
+
+  def test_channel_max_limits_sessions(self):
+    return
+    # This is an index -- so max number of channels should be 1.
+    self.c1.transport.channel_max = 0
+    self.c1.open()
+    self.c2.open()
+    ssn_0 = self.c2.session()
+    assert ssn_0 != None
+    ssn_0.open()
+    self.pump()
+    try:
+      ssn_1 = self.c2.session()
+      assert False, "expected session exception"
+    except SessionException:
+      pass
 
   def test_cleanup(self):
     self.c1.open()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org