You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2015/06/18 00:39:31 UTC
[2/2] qpid-proton git commit: PROTON-915: Send correct AMQP header
upon protocol mismatch - Split apart the transport tests into client and
server tests
PROTON-915: Send correct AMQP header upon protocol mismatch
- Split apart the transport tests into client and server tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3aab9a07
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3aab9a07
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3aab9a07
Branch: refs/heads/master
Commit: 3aab9a07bca507aa9160e00eb54179b3df441ebb
Parents: 638c18b
Author: Andrew Stitcher <as...@apache.org>
Authored: Wed Jun 17 15:44:29 2015 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Wed Jun 17 18:39:03 2015 -0400
----------------------------------------------------------------------
proton-c/src/transport/transport.c | 13 +-
tests/python/proton_tests/transport.py | 190 ++++++++++++++++++++++++++--
2 files changed, 191 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3aab9a07/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 0e23975..733f695 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -168,6 +168,13 @@ const pn_io_layer_t pni_passthru_layer = {
NULL
};
+const pn_io_layer_t pni_header_error_layer = {
+ pn_io_layer_input_error,
+ pn_output_write_amqp_header,
+ NULL,
+ NULL
+};
+
const pn_io_layer_t pni_error_layer = {
pn_io_layer_input_error,
pn_io_layer_output_error,
@@ -286,7 +293,7 @@ ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int lay
pn_do_error(transport, "amqp:connection:framing-error",
"%s: '%s'%s", error, quoted,
!eos ? "" : " (connection aborted)");
- pn_set_error_layer(transport);
+ transport->io_layers[layer] = &pni_header_error_layer;
return 0;
}
@@ -2397,7 +2404,9 @@ static ssize_t pn_output_write_amqp_header(pn_transport_t* transport, unsigned i
pn_transport_logf(transport, " -> %s", "AMQP");
assert(available >= 8);
memmove(bytes, AMQP_HEADER, 8);
- if (transport->io_layers[layer] == &amqp_write_header_layer) {
+ if (transport->io_layers[layer] == &pni_header_error_layer) {
+ transport->io_layers[layer] = &pni_error_layer;
+ }else if (transport->io_layers[layer] == &amqp_write_header_layer) {
transport->io_layers[layer] = &amqp_layer;
} else {
transport->io_layers[layer] = &amqp_read_header_layer;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3aab9a07/tests/python/proton_tests/transport.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/transport.py b/tests/python/proton_tests/transport.py
index e56b122..958abf7 100644
--- a/tests/python/proton_tests/transport.py
+++ b/tests/python/proton_tests/transport.py
@@ -24,7 +24,7 @@ from proton import *
class Test(common.Test):
pass
-class TransportTest(Test):
+class ClientTransportTest(Test):
def setup(self):
self.transport = Transport()
@@ -87,6 +87,16 @@ class TransportTest(Test):
self.transport.close_tail()
self.assert_error(u'amqp:connection:framing-error')
+ def testProtocolNotSupported(self):
+ self.transport.push("AMQP\x01\x01\x0a\x00")
+ p = self.transport.pending()
+ assert p >= 8, p
+ bytes = self.transport.peek(p)
+ assert bytes[:8] == "AMQP\x00\x01\x00\x00"
+ self.transport.pop(p)
+ self.drain()
+ assert self.transport.closed
+
def testPeek(self):
out = self.transport.peek(1024)
assert out is not None
@@ -163,30 +173,190 @@ class TransportTest(Test):
self.peer.push(dat2[len(dat1):])
self.peer.push(dat3)
+class ServerTransportTest(Test):
+
+ def setup(self):
+ self.transport = Transport(Transport.SERVER)
+ self.peer = Transport()
+ self.conn = Connection()
+ self.peer.bind(self.conn)
+
+ def teardown(self):
+ self.transport = None
+ self.peer = None
+ self.conn = None
+
+ def drain(self):
+ while True:
+ p = self.transport.pending()
+ if p < 0:
+ return
+ elif p > 0:
+ bytes = self.transport.peek(p)
+ self.peer.push(bytes)
+ self.transport.pop(len(bytes))
+ else:
+ assert False
+
+ def assert_error(self, name):
+ assert self.conn.remote_container is None, self.conn.remote_container
+ self.drain()
+ # verify that we received an open frame
+ assert self.conn.remote_container is not None, self.conn.remote_container
+ # verify that we received a close frame
+ assert self.conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_CLOSED, self.conn.state
+ # verify that a framing error was reported
+ assert self.conn.remote_condition.name == name, self.conn.remote_condition
+
+ # TODO: This may no longer be testing anything
+ def testEOS(self):
+ self.transport.push("") # should be a noop
+ self.transport.close_tail()
+ p = self.transport.pending()
+ self.drain()
+ assert self.transport.closed
+
+ def testPartial(self):
+ self.transport.push("AMQ") # partial header
+ self.transport.close_tail()
+ p = self.transport.pending()
+ assert p >= 8, p
+ bytes = self.transport.peek(p)
+ assert bytes[:8] == "AMQP\x00\x01\x00\x00"
+ self.transport.pop(p)
+ self.drain()
+ assert self.transport.closed
+
+ def testGarbage(self, garbage="GARBAGE_"):
+ self.transport.push(garbage)
+ p = self.transport.pending()
+ assert p >= 8, p
+ bytes = self.transport.peek(p)
+ assert bytes[:8] == "AMQP\x00\x01\x00\x00"
+ self.transport.pop(p)
+ self.drain()
+ assert self.transport.closed
+
+ def testSmallGarbage(self):
+ self.testGarbage("XXX")
+
+ def testBigGarbage(self):
+ self.testGarbage("GARBAGE_XXX")
+
+ def testHeader(self):
+ self.transport.push("AMQP\x00\x01\x00\x00")
+ self.transport.close_tail()
+ self.assert_error(u'amqp:connection:framing-error')
+
+ def testProtocolNotSupported(self):
+ self.transport.push("AMQP\x01\x01\x0a\x00")
+ p = self.transport.pending()
+ assert p >= 8, p
+ bytes = self.transport.peek(p)
+ assert bytes[:8] == "AMQP\x00\x01\x00\x00"
+ self.transport.pop(p)
+ self.drain()
+ assert self.transport.closed
+
+ def testPeek(self):
+ out = self.transport.peek(1024)
+ assert out is not None
+
+ def testBindAfterOpen(self):
+ conn = Connection()
+ ssn = conn.session()
+ conn.open()
+ ssn.open()
+ conn.container = "test-container"
+ conn.hostname = "test-hostname"
+ trn = Transport()
+ trn.bind(conn)
+ out = trn.peek(1024)
+ assert "test-container" in out, repr(out)
+ assert "test-hostname" in out, repr(out)
+ self.transport.push(out)
+
+ c = Connection()
+ assert c.remote_container == None
+ assert c.remote_hostname == None
+ assert c.session_head(0) == None
+ self.transport.bind(c)
+ assert c.remote_container == "test-container"
+ assert c.remote_hostname == "test-hostname"
+ assert c.session_head(0) != None
+
+ def testCloseHead(self):
+ n = self.transport.pending()
+ assert n >= 0, n
+ try:
+ self.transport.close_head()
+ except TransportException, e:
+ assert "aborted" in str(e), str(e)
+ n = self.transport.pending()
+ assert n < 0, n
+
+ def testCloseTail(self):
+ n = self.transport.capacity()
+ assert n > 0, n
+ try:
+ self.transport.close_tail()
+ except TransportException, e:
+ assert "aborted" in str(e), str(e)
+ n = self.transport.capacity()
+ assert n < 0, n
+
+ def testUnpairedPop(self):
+ conn = Connection()
+ self.transport.bind(conn)
+
+ conn.hostname = "hostname"
+ conn.open()
+
+ dat1 = self.transport.peek(1024)
+
+ ssn = conn.session()
+ ssn.open()
+
+ dat2 = self.transport.peek(1024)
+
+ assert dat2[:len(dat1)] == dat1
+
+ snd = ssn.sender("sender")
+ snd.open()
+
+ self.transport.pop(len(dat1))
+ self.transport.pop(len(dat2) - len(dat1))
+ dat3 = self.transport.peek(1024)
+ self.transport.pop(len(dat3))
+ assert self.transport.peek(1024) == ""
+
+ self.peer.push(dat1)
+ self.peer.push(dat2[len(dat1):])
+ self.peer.push(dat3)
+
def testEOSAfterSASL(self):
- srv = Transport(mode=Transport.SERVER)
- srv.sasl().allowed_mechs('ANONYMOUS')
+ self.transport.sasl().allowed_mechs('ANONYMOUS')
self.peer.sasl().allowed_mechs('ANONYMOUS')
# this should send over the sasl header plus a sasl-init set up
# for anonymous
p = self.peer.pending()
- srv.push(self.peer.peek(p))
+ self.transport.push(self.peer.peek(p))
self.peer.pop(p)
# now we send EOS
- srv.close_tail()
+ self.transport.close_tail()
# the server may send an error back
- p = srv.pending()
+ p = self.transport.pending()
while p>0:
- self.peer.push(srv.peek(p))
- srv.pop(p)
- p = srv.pending()
+ self.peer.push(self.transport.peek(p))
+ self.transport.pop(p)
+ p = self.transport.pending()
# server closed
- assert srv.pending() < 0
+ assert self.transport.pending() < 0
class LogTest(Test):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org