You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/01/26 15:19:01 UTC
[2/2] qpid-proton git commit: PROTON-1110: add toggle for suppressing
the synthetic flow events generated on sending
PROTON-1110: add toggle for suppressing the synthetic flow events generated on sending
(cherry picked from commit 5ec901323eb188b840a519df3e7aeea15523c218)
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/271b3636
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/271b3636
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/271b3636
Branch: refs/heads/0.12.x
Commit: 271b363638daa78a15964b1d8d1d46c187d4b7e7
Parents: 193bceb
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Jan 26 13:49:51 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Jan 26 14:17:39 2016 +0000
----------------------------------------------------------------------
.../apache/qpid/proton/engine/Transport.java | 12 ++
.../qpid/proton/engine/impl/TransportImpl.java | 15 ++-
.../proton/engine/impl/TransportImplTest.java | 124 +++++++++++++++++++
3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/271b3636/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
index 2e322d6..5d8b79d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
@@ -270,4 +270,16 @@ public interface Transport extends Endpoint
long getFramesInput();
long getFramesOutput();
+
+ /**
+ * Configure whether a synthetic Flow event should be emitted when messages are sent,
+ * reflecting a change in the credit level on the link that may prompt other action.
+ *
+ * Defaults to true.
+ *
+ * @param emitFlowEventOnSend true if a flow event should be emitted, false otherwise
+ */
+ void setEmitFlowEventOnSend(boolean emitFlowEventOnSend);
+
+ boolean isEmitFlowEventOnSend();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/271b3636/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index d85794f..d132508 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -116,6 +116,7 @@ public class TransportImpl extends EndpointImpl
private boolean _init;
private boolean _processingStarted;
+ private boolean _emitFlowEventOnSend = true;
private FrameHandler _frameHandler = this;
private boolean _head_closed = false;
@@ -611,7 +612,7 @@ public class TransportImpl extends EndpointImpl
tpLink.setInProgressDelivery(delivery);
}
- if (snd.getLocalState() != EndpointState.CLOSED) {
+ if (_emitFlowEventOnSend && snd.getLocalState() != EndpointState.CLOSED) {
getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
}
}
@@ -1657,4 +1658,16 @@ public class TransportImpl extends EndpointImpl
public Reactor getReactor() {
return _reactor;
}
+
+ @Override
+ public void setEmitFlowEventOnSend(boolean emitFlowEventOnSend)
+ {
+ _emitFlowEventOnSend = emitFlowEventOnSend;
+ }
+
+ @Override
+ public boolean isEmitFlowEventOnSend()
+ {
+ return _emitFlowEventOnSend;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/271b3636/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 888f4af..a0e6766 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -24,12 +24,14 @@ import static org.apache.qpid.proton.engine.impl.TransportTestHelper.stringOfLen
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.LinkedList;
import org.apache.qpid.proton.Proton;
@@ -47,6 +49,7 @@ import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
@@ -567,6 +570,127 @@ public class TransportImplTest
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
}
+ @Test
+ public void testEmitFlowEventOnSend()
+ {
+ doEmitFlowOnSendTestImpl(true);
+ }
+
+ public void testSupressFlowEventOnSend()
+ {
+ doEmitFlowOnSendTestImpl(false);
+ }
+
+ void doEmitFlowOnSendTestImpl(boolean emitFlowEventOnSend)
+ {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(emitFlowEventOnSend);
+
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ Collector collector = Collector.Factory.create();
+ connection.collect(collector);
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName = "mySender";
+ Sender sender = session.sender(linkName);
+ sender.open();
+
+ sendMessage(sender, "tag1", "content1");
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
+
+ assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN,
+ Event.Type.TRANSPORT, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+ // Now open the connection, expect the Open and Begin frames but
+ // nothing else as we haven't opened the receiver itself yet.
+ connection.open();
+
+ pumpMockTransport(transport);
+
+ assertEvents(collector, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach = new Attach();
+ attach.setHandle(UnsignedInteger.ZERO);
+ attach.setRole(Role.RECEIVER);
+ attach.setName(linkName);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach, null));
+
+ Flow flow = new Flow();
+ flow.setHandle(UnsignedInteger.ZERO);
+ flow.setDeliveryCount(UnsignedInteger.ZERO);
+ flow.setNextIncomingId(UnsignedInteger.ONE);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ flow.setLinkCredit(UnsignedInteger.valueOf(10));
+
+ transport.handleFrame(new TransportFrame(0, flow, null));
+
+ assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN,
+ Event.Type.LINK_REMOTE_OPEN, Event.Type.LINK_FLOW);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
+
+ if(emitFlowEventOnSend)
+ {
+ assertEvents(collector, Event.Type.LINK_FLOW);
+ }
+ else
+ {
+ assertNoEvents(collector);
+ }
+ }
+
+ private void assertNoEvents(Collector collector)
+ {
+ assertEvents(collector);
+ }
+
+ private void assertEvents(Collector collector, Event.Type... expectedEventTypes)
+ {
+
+ if(expectedEventTypes.length == 0)
+ {
+ assertNull("Expected no events, but at least one was present: " + collector.peek(), collector.peek());
+ }
+ else
+ {
+ ArrayList<Event.Type> eventTypesList = new ArrayList<Event.Type>();
+ Event event = null;
+ while ((event = collector.peek()) != null) {
+ eventTypesList.add(event.getType());
+ collector.pop();
+ }
+
+ assertArrayEquals("Unexpected event types: " + eventTypesList, expectedEventTypes, eventTypesList.toArray(new Event.Type[0]));
+ }
+ }
+
private void pumpMockTransport(MockTransportImpl transport)
{
while(transport.pending() > 0)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org