You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/01/26 15:19:01 UTC

[2/2] qpid-proton git commit: PROTON-1110: add toggle for suppressing the synthetic flow events generated on sending

PROTON-1110: add toggle for suppressing the synthetic flow events generated on sending

(cherry picked from commit 5ec901323eb188b840a519df3e7aeea15523c218)


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/271b3636
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/271b3636
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/271b3636

Branch: refs/heads/0.12.x
Commit: 271b363638daa78a15964b1d8d1d46c187d4b7e7
Parents: 193bceb
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue Jan 26 13:49:51 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue Jan 26 14:17:39 2016 +0000

----------------------------------------------------------------------
 .../apache/qpid/proton/engine/Transport.java    |  12 ++
 .../qpid/proton/engine/impl/TransportImpl.java  |  15 ++-
 .../proton/engine/impl/TransportImplTest.java   | 124 +++++++++++++++++++
 3 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/271b3636/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
index 2e322d6..5d8b79d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java
@@ -270,4 +270,16 @@ public interface Transport extends Endpoint
     long getFramesInput();
 
     long getFramesOutput();
+
+    /**
+     * Configure whether a synthetic Flow event should be emitted when messages are sent,
+     * reflecting a change in the credit level on the link that may prompt other action.
+     *
+     * Defaults to true.
+     *
+     * @param emitFlowEventOnSend true if a flow event should be emitted, false otherwise
+     */
+    void setEmitFlowEventOnSend(boolean emitFlowEventOnSend);
+
+    boolean isEmitFlowEventOnSend();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/271b3636/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index d85794f..d132508 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -116,6 +116,7 @@ public class TransportImpl extends EndpointImpl
 
     private boolean _init;
     private boolean _processingStarted;
+    private boolean _emitFlowEventOnSend = true;
 
     private FrameHandler _frameHandler = this;
     private boolean _head_closed = false;
@@ -611,7 +612,7 @@ public class TransportImpl extends EndpointImpl
                 tpLink.setInProgressDelivery(delivery);
             }
 
-            if (snd.getLocalState() != EndpointState.CLOSED) {
+            if (_emitFlowEventOnSend && snd.getLocalState() != EndpointState.CLOSED) {
                 getConnectionImpl().put(Event.Type.LINK_FLOW, snd);
             }
         }
@@ -1657,4 +1658,16 @@ public class TransportImpl extends EndpointImpl
     public Reactor getReactor() {
         return _reactor;
     }
+
+    @Override
+    public void setEmitFlowEventOnSend(boolean emitFlowEventOnSend)
+    {
+        _emitFlowEventOnSend = emitFlowEventOnSend;
+    }
+
+    @Override
+    public boolean isEmitFlowEventOnSend()
+    {
+        return _emitFlowEventOnSend;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/271b3636/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 888f4af..a0e6766 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -24,12 +24,14 @@ import static org.apache.qpid.proton.engine.impl.TransportTestHelper.stringOfLen
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.LinkedList;
 
 import org.apache.qpid.proton.Proton;
@@ -47,6 +49,7 @@ import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Event;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sender;
@@ -567,6 +570,127 @@ public class TransportImplTest
         assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
     }
 
+    @Test
+    public void testEmitFlowEventOnSend()
+    {
+        doEmitFlowOnSendTestImpl(true);
+    }
+
+    public void testSupressFlowEventOnSend()
+    {
+        doEmitFlowOnSendTestImpl(false);
+    }
+
+    void doEmitFlowOnSendTestImpl(boolean emitFlowEventOnSend)
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(emitFlowEventOnSend);
+
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        Collector collector = Collector.Factory.create();
+        connection.collect(collector);
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName = "mySender";
+        Sender sender = session.sender(linkName);
+        sender.open();
+
+        sendMessage(sender, "tag1", "content1");
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
+
+        assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN,
+                                Event.Type.TRANSPORT, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+        // Now open the connection, expect the Open and Begin frames but
+        // nothing else as we haven't opened the receiver itself yet.
+        connection.open();
+
+        pumpMockTransport(transport);
+
+        assertEvents(collector, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach = new Attach();
+        attach.setHandle(UnsignedInteger.ZERO);
+        attach.setRole(Role.RECEIVER);
+        attach.setName(linkName);
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach, null));
+
+        Flow flow = new Flow();
+        flow.setHandle(UnsignedInteger.ZERO);
+        flow.setDeliveryCount(UnsignedInteger.ZERO);
+        flow.setNextIncomingId(UnsignedInteger.ONE);
+        flow.setNextOutgoingId(UnsignedInteger.ZERO);
+        flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        flow.setLinkCredit(UnsignedInteger.valueOf(10));
+
+        transport.handleFrame(new TransportFrame(0, flow, null));
+
+        assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN,
+                                Event.Type.LINK_REMOTE_OPEN, Event.Type.LINK_FLOW);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
+
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
+
+        if(emitFlowEventOnSend)
+        {
+            assertEvents(collector, Event.Type.LINK_FLOW);
+        }
+        else
+        {
+            assertNoEvents(collector);
+        }
+    }
+
+    private void assertNoEvents(Collector collector)
+    {
+        assertEvents(collector);
+    }
+
+    private void assertEvents(Collector collector, Event.Type... expectedEventTypes)
+    {
+
+        if(expectedEventTypes.length == 0)
+        {
+            assertNull("Expected no events, but at least one was present: " + collector.peek(), collector.peek());
+        }
+        else
+        {
+            ArrayList<Event.Type> eventTypesList = new ArrayList<Event.Type>();
+            Event event = null;
+            while ((event = collector.peek()) != null) {
+                eventTypesList.add(event.getType());
+                collector.pop();
+            }
+
+            assertArrayEquals("Unexpected event types: " + eventTypesList, expectedEventTypes, eventTypesList.toArray(new Event.Type[0]));
+        }
+    }
+
     private void pumpMockTransport(MockTransportImpl transport)
     {
         while(transport.pending() > 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org