You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/07/10 00:12:25 UTC
[45/50] qpid-proton git commit: PROTON-936: make the session outgoing
window a fixed value, defaulted very large but configurable if needed
PROTON-936: make the session outgoing window a fixed value, defaulted very large but configurable if needed
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a02ad90c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a02ad90c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a02ad90c
Branch: refs/heads/cjansen-cpp-client
Commit: a02ad90cab9af446e8251157a1525e3413776934
Parents: 32b00ae
Author: Robert Gemmell <ro...@apache.org>
Authored: Thu Jul 9 12:20:24 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Thu Jul 9 12:26:04 2015 +0100
----------------------------------------------------------------------
proton-c/bindings/python/proton/__init__.py | 8 ++++++++
proton-c/include/proton/session.h | 16 +++++++++++++++
proton-c/src/engine/engine-internal.h | 1 +
proton-c/src/engine/engine.c | 13 ++++++++++++
proton-c/src/transport/transport.c | 11 +---------
.../org/apache/qpid/proton/engine/Session.java | 8 ++++++++
.../qpid/proton/engine/impl/SessionImpl.java | 18 +++++++++++++++++
.../qpid/proton/engine/impl/TransportImpl.java | 2 +-
.../proton/engine/impl/TransportSession.java | 21 ++------------------
proton-j/src/main/resources/cengine.py | 6 ++++++
tests/python/proton_tests/engine.py | 6 ++++++
11 files changed, 80 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/bindings/python/proton/__init__.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py
index 9c75800..d5dcceb 100644
--- a/proton-c/bindings/python/proton/__init__.py
+++ b/proton-c/bindings/python/proton/__init__.py
@@ -2571,6 +2571,14 @@ class Session(Wrapper, Endpoint):
incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity)
+ def _get_outgoing_window(self):
+ return pn_session_get_outgoing_window(self._impl)
+
+ def _set_outgoing_window(self, window):
+ pn_session_set_outgoing_window(self._impl, window)
+
+ outgoing_window = property(_get_outgoing_window, _set_outgoing_window)
+
@property
def outgoing_bytes(self):
return pn_session_outgoing_bytes(self._impl)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/include/proton/session.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/session.h b/proton-c/include/proton/session.h
index 5dedb99..94d2869 100644
--- a/proton-c/include/proton/session.h
+++ b/proton-c/include/proton/session.h
@@ -216,6 +216,22 @@ PN_EXTERN size_t pn_session_get_incoming_capacity(pn_session_t *session);
PN_EXTERN void pn_session_set_incoming_capacity(pn_session_t *session, size_t capacity);
/**
+ * Get the outgoing window for a session object.
+ *
+ * @param[in] session the session object
+ * @return the outgoing window for the session
+ */
+PN_EXTERN size_t pn_session_get_outgoing_window(pn_session_t *session);
+
+/**
+ * Set the outgoing window for a session object.
+ *
+ * @param[in] session the session object
+ * @param[in] window the outgoing window for the session
+ */
+PN_EXTERN void pn_session_set_outgoing_window(pn_session_t *session, size_t window);
+
+/**
* Get the number of outgoing bytes currently buffered by a session.
*
* @param[in] session a session object
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h
index c03a0a3..727f50d 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -246,6 +246,7 @@ struct pn_session_t {
pn_sequence_t outgoing_bytes;
pn_sequence_t incoming_deliveries;
pn_sequence_t outgoing_deliveries;
+ pn_sequence_t outgoing_window;
pn_session_state_t state;
};
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index fda719a..ffbdf95 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -991,6 +991,7 @@ pn_session_t *pn_session(pn_connection_t *conn)
ssn->outgoing_bytes = 0;
ssn->incoming_deliveries = 0;
ssn->outgoing_deliveries = 0;
+ ssn->outgoing_window = 2147483647;
// begin transport state
memset(&ssn->state, 0, sizeof(ssn->state));
@@ -1043,6 +1044,18 @@ void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity)
ssn->incoming_capacity = capacity;
}
+size_t pn_session_get_outgoing_window(pn_session_t *ssn)
+{
+ assert(ssn);
+ return ssn->outgoing_window;
+}
+
+void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window)
+{
+ assert(ssn);
+ ssn->outgoing_window = window;
+}
+
size_t pn_session_outgoing_bytes(pn_session_t *ssn)
{
assert(ssn);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index d2c3509..9ce01bd 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -1909,16 +1909,7 @@ static uint16_t allocate_alias(pn_hash_t *aliases, uint32_t max_index, int * val
static size_t pni_session_outgoing_window(pn_session_t *ssn)
{
- uint32_t size = ssn->connection->transport->remote_max_frame;
- if (!size) {
- return ssn->outgoing_deliveries;
- } else {
- pn_sequence_t frames = ssn->outgoing_bytes/size;
- if (ssn->outgoing_bytes % size) {
- frames++;
- }
- return pn_max(frames, ssn->outgoing_deliveries);
- }
+ return ssn->outgoing_window;
}
static size_t pni_session_incoming_window(pn_session_t *ssn)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
index f2f048a..2179dda 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java
@@ -52,4 +52,12 @@ public interface Session extends Endpoint
public int getOutgoingBytes();
+ public long getOutgoingWindow();
+
+ /**
+ * Sets the outgoing window size.
+ *
+ * @param outgoingWindowSize the outgoing window size
+ */
+ public void setOutgoingWindow(long outgoingWindowSize);
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
index 3af1820..9e108e4 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
@@ -44,6 +44,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
private int _outgoingBytes = 0;
private int _incomingDeliveries = 0;
private int _outgoingDeliveries = 0;
+ private long _outgoingWindow = Integer.MAX_VALUE;
private LinkNode<SessionImpl> _node;
@@ -270,4 +271,21 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession
{
getConnectionImpl().put(Event.Type.SESSION_LOCAL_CLOSE, this);
}
+
+ @Override
+ public void setOutgoingWindow(long outgoingWindow) {
+ if(outgoingWindow < 0 || outgoingWindow > 0xFFFFFFFFL)
+ {
+ throw new IllegalArgumentException("Value '" + outgoingWindow + "' must be in the"
+ + " range [0 - 2^32-1]");
+ }
+
+ _outgoingWindow = outgoingWindow;
+ }
+
+ @Override
+ public long getOutgoingWindow()
+ {
+ return _outgoingWindow;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 595afd6..7d285b1 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -448,7 +448,7 @@ public class TransportImpl extends EndpointImpl
Flow flow = new Flow();
flow.setNextIncomingId(ssn.getNextIncomingId());
flow.setNextOutgoingId(ssn.getNextOutgoingId());
- ssn.updateWindows();
+ ssn.updateIncomingWindow();
flow.setIncomingWindow(ssn.getIncomingWindowSize());
flow.setOutgoingWindow(ssn.getOutgoingWindowSize());
if (link != null) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 1f4a9f8..33c6cd0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -70,6 +70,7 @@ class TransportSession
{
_transport = transport;
_session = session;
+ _outgoingWindowSize = UnsignedInteger.valueOf(session.getOutgoingWindow());
}
void unbind()
@@ -175,32 +176,14 @@ class TransportSession
return _incomingWindowSize;
}
- public void updateWindows()
+ void updateIncomingWindow()
{
- // incoming window
int size = _transport.getMaxFrameSize();
if (size <= 0) {
_incomingWindowSize = UnsignedInteger.valueOf(2147483647); // biggest legal value
} else {
_incomingWindowSize = UnsignedInteger.valueOf((_session.getIncomingCapacity() - _session.getIncomingBytes())/size);
}
-
- // outgoing window
- int outgoingDeliveries = _session.getOutgoingDeliveries();
- if (size <= 0) {
- _outgoingWindowSize = UnsignedInteger.valueOf(outgoingDeliveries);
- } else {
- int outgoingBytes = _session.getOutgoingBytes();
- int frames = outgoingBytes/size;
- if (outgoingBytes % size > 0) {
- frames++;
- }
- if (frames > outgoingDeliveries) {
- _outgoingWindowSize = UnsignedInteger.valueOf(frames);
- } else {
- _outgoingWindowSize = UnsignedInteger.valueOf(outgoingDeliveries);
- }
- }
}
public UnsignedInteger getOutgoingDeliveryId()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/proton-j/src/main/resources/cengine.py
----------------------------------------------------------------------
diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py
index c94d023..ee2226d 100644
--- a/proton-j/src/main/resources/cengine.py
+++ b/proton-j/src/main/resources/cengine.py
@@ -280,6 +280,12 @@ def pn_session_incoming_bytes(ssn):
def pn_session_outgoing_bytes(ssn):
return ssn.impl.getOutgoingBytes()
+def pn_session_get_outgoing_window(ssn):
+ return ssn.impl.getOutgoingWindow()
+
+def pn_session_set_outgoing_window(ssn, window):
+ ssn.impl.setOutgoingWindow(window)
+
def pn_session_condition(ssn):
return ssn.condition
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a02ad90c/tests/python/proton_tests/engine.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py
index 258665d..c18683f 100644
--- a/tests/python/proton_tests/engine.py
+++ b/tests/python/proton_tests/engine.py
@@ -506,6 +506,12 @@ class SessionTest(Test):
assert snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
assert rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE
+ def test_set_get_outgoing_window(self):
+ assert self.ssn.outgoing_window == 2147483647
+
+ self.ssn.outgoing_window = 1024
+ assert self.ssn.outgoing_window == 1024
+
class LinkTest(Test):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org