You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/07/09 13:48:42 UTC

qpid-proton git commit: PROTON-936: make the session outgoing window a fixed value, defaulted very large but configurable if needed

Repository: qpid-proton
Updated Branches:
  refs/heads/master 32b00aec6 -> a02ad90ca


PROTON-936: make the session outgoing window a fixed value, defaulted very large but configurable if needed


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a02ad90c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a02ad90c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a02ad90c

Branch: refs/heads/master
Commit: a02ad90cab9af446e8251157a1525e3413776934
Parents: 32b00ae
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Jul 9 12:20:24 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Jul 9 12:26:04 2015 +0100

----------------------------------------------------------------------
 proton-c/bindings/python/proton/__init__.py     |  8 ++++++++
 proton-c/include/proton/session.h               | 16 +++++++++++++++
 proton-c/src/engine/engine-internal.h           |  1 +
 proton-c/src/engine/engine.c                    | 13 ++++++++++++
 proton-c/src/transport/transport.c              | 11 +---------
 .../org/apache/qpid/proton/engine/Session.java  |  8 ++++++++
 .../qpid/proton/engine/impl/SessionImpl.java    | 18 +++++++++++++++++
 .../qpid/proton/engine/impl/TransportImpl.java  |  2 +-
 .../proton/engine/impl/TransportSession.java    | 21 ++------------------
 proton-j/src/main/resources/cengine.py          |  6 ++++++
 tests/python/proton_tests/engine.py             |  6 ++++++
 11 files changed, 80 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 9c75800..d5dcceb 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -2571,6 +2571,14 @@ class Session(Wrapper, Endpoint):
 
   incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
 
+  def _get_outgoing_window(self):
+    return pn_session_get_outgoing_window(self._impl)
+
+  def _set_outgoing_window(self, window):
+    pn_session_set_outgoing_window(self._impl, window)
+
+  outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
+
   @property
   def outgoing_bytes(self):
     return pn_session_outgoing_bytes(self._impl)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/include/proton/session.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/session.h b/proton-c/include/proton/session.h
index 5dedb99..94d2869 100644
--- a/proton-c/include/proton/session.h
+++ b/proton-c/include/proton/session.h
@@ -216,6 +216,22 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);
 PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t capacity);
 
 /**
+ * Get the outgoing window for a session object.
+ *
+ * @param[in] session the session object
+ * @return  the outgoing window for the session
+ */
+PN_EXTERN size_t pn_session_get_outgoing_window(pn_session_t *session);
+
+/**
+ * Set the outgoing window for a session object.
+ *
+ * @param[in] session the session object
+ * @param[in] window the outgoing window for the session
+ */
+PN_EXTERN void pn_session_set_outgoing_window(pn_session_t *session, size_t window);
+
+/**
  * Get the number of outgoing bytes currently buffered by a session.
  *
  * @param[in] session a session object

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index c03a0a3..727f50d 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -246,6 +246,7 @@ struct pn_session_t {
   pn_sequence_t outgoing_bytes;
   pn_sequence_t incoming_deliveries;
   pn_sequence_t outgoing_deliveries;
+  pn_sequence_t outgoing_window;
   pn_session_state_t state;
 };
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index fda719a..ffbdf95 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -991,6 +991,7 @@ pn_session_t *pn_session(pn_connection_t *conn)
   ssn->outgoing_bytes = 0;
   ssn->incoming_deliveries = 0;
   ssn->outgoing_deliveries = 0;
+  ssn->outgoing_window = 2147483647;
 
   // begin transport state
   memset(&ssn->state, 0, sizeof(ssn->state));
@@ -1043,6 +1044,18 @@ void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
   ssn->incoming_capacity = capacity;
 }
 
+size_t pn_session_get_outgoing_window(pn_session_t *ssn)
+{
+  assert(ssn);
+  return ssn->outgoing_window;
+}
+
+void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window)
+{
+  assert(ssn);
+  ssn->outgoing_window = window;
+}
+
 size_t pn_session_outgoing_bytes(pn_session_t *ssn)
 {
   assert(ssn);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index d2c3509..9ce01bd 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -1909,16 +1909,7 @@ static uint16_t allocate_alias(pn_hash_t *aliases, uint32_t max_index, int * val
 
 static size_t pni_session_outgoing_window(pn_session_t *ssn)
 {
-  uint32_t size = ssn->connection->transport->remote_max_frame;
-  if (!size) {
-    return ssn->outgoing_deliveries;
-  } else {
-    pn_sequence_t frames = ssn->outgoing_bytes/size;
-    if (ssn->outgoing_bytes % size) {
-      frames++;
-    }
-    return pn_max(frames, ssn->outgoing_deliveries);
-  }
+  return ssn->outgoing_window;
 }
 
 static size_t pni_session_incoming_window(pn_session_t *ssn)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
index f2f048a..2179dda 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
@@ -52,4 +52,12 @@ public interface Session extends Endpoint
 
     public int getOutgoingBytes();
 
+    public long getOutgoingWindow();
+
+    /**
+     * Sets the outgoing window size.
+     *
+     * @param outgoingWindowSize the outgoing window size
+     */
+    public void setOutgoingWindow(long outgoingWindowSize);
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
index 3af1820..9e108e4 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
@@ -44,6 +44,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
     private int _outgoingBytes = 0;
     private int _incomingDeliveries = 0;
     private int _outgoingDeliveries = 0;
+    private long _outgoingWindow = Integer.MAX_VALUE;
 
     private LinkNode<SessionImpl> _node;
 
@@ -270,4 +271,21 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
     {
         getConnectionImpl().put(Event.Type.SESSION_LOCAL_CLOSE, this);
     }
+
+    @Override
+    public void setOutgoingWindow(long outgoingWindow) {
+        if(outgoingWindow < 0 || outgoingWindow > 0xFFFFFFFFL)
+        {
+            throw new IllegalArgumentException("Value '" + outgoingWindow + "' must be in the"
+                    + " range [0 - 2^32-1]");
+        }
+
+        _outgoingWindow = outgoingWindow;
+    }
+
+    @Override
+    public long getOutgoingWindow()
+    {
+        return _outgoingWindow;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 595afd6..7d285b1 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -448,7 +448,7 @@ public class TransportImpl extends EndpointImpl
         Flow flow = new Flow();
         flow.setNextIncomingId(ssn.getNextIncomingId());
         flow.setNextOutgoingId(ssn.getNextOutgoingId());
-        ssn.updateWindows();
+        ssn.updateIncomingWindow();
         flow.setIncomingWindow(ssn.getIncomingWindowSize());
         flow.setOutgoingWindow(ssn.getOutgoingWindowSize());
         if (link != null) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 1f4a9f8..33c6cd0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -70,6 +70,7 @@ class TransportSession
     {
         _transport = transport;
         _session = session;
+        _outgoingWindowSize = UnsignedInteger.valueOf(session.getOutgoingWindow());
     }
 
     void unbind()
@@ -175,32 +176,14 @@ class TransportSession
         return _incomingWindowSize;
     }
 
-    public void updateWindows()
+    void updateIncomingWindow()
     {
-        // incoming window
         int size = _transport.getMaxFrameSize();
         if (size <= 0) {
             _incomingWindowSize = UnsignedInteger.valueOf(2147483647); // biggest legal value
         } else {
             _incomingWindowSize = UnsignedInteger.valueOf((_session.getIncomingCapacity() - _session.getIncomingBytes())/size);
         }
-
-        // outgoing window
-        int outgoingDeliveries = _session.getOutgoingDeliveries();
-        if (size <= 0) {
-            _outgoingWindowSize = UnsignedInteger.valueOf(outgoingDeliveries);
-        } else {
-            int outgoingBytes = _session.getOutgoingBytes();
-            int frames = outgoingBytes/size;
-            if (outgoingBytes % size > 0) {
-                frames++;
-            }
-            if (frames > outgoingDeliveries) {
-                _outgoingWindowSize = UnsignedInteger.valueOf(frames);
-            } else {
-                _outgoingWindowSize = UnsignedInteger.valueOf(outgoingDeliveries);
-            }
-        }
     }
 
     public UnsignedInteger getOutgoingDeliveryId()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/resources/cengine.py
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py
index c94d023..ee2226d 100644
--- a/proton-j/src/main/resources/cengine.py
+++ b/proton-j/src/main/resources/cengine.py
@@ -280,6 +280,12 @@ def pn_session_incoming_bytes(ssn):
 def pn_session_outgoing_bytes(ssn):
   return ssn.impl.getOutgoingBytes()
 
+def pn_session_get_outgoing_window(ssn):
+  return ssn.impl.getOutgoingWindow()
+
+def pn_session_set_outgoing_window(ssn, window):
+  ssn.impl.setOutgoingWindow(window)
+
 def pn_session_condition(ssn):
   return ssn.condition
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index 258665d..c18683f 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -506,6 +506,12 @@ class SessionTest(Test):
     assert snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
     assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
 
+  def test_set_get_outgoing_window(self):
+    assert self.ssn.outgoing_window == 2147483647
+
+    self.ssn.outgoing_window = 1024
+    assert self.ssn.outgoing_window == 1024
+
 
 class LinkTest(Test):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org