You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mc...@apache.org on 2015/06/18 16:03:22 UTC

[15/50] qpid-proton git commit: PROTON-915: Send correct AMQP header upon protocol mismatch - Split apart the transport tests into client and server tests

PROTON-915: Send correct AMQP header upon protocol mismatch
- Split apart the transport tests into client and server tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3aab9a07
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3aab9a07
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3aab9a07

Branch: refs/heads/PROTON-781-ruby-reactor-apis
Commit: 3aab9a07bca507aa9160e00eb54179b3df441ebb
Parents: 638c18b
Author: Andrew Stitcher <as...@apache.org>
Authored: Wed Jun 17 15:44:29 2015 -0400
Committer: Andrew Stitcher <as...@apache.org>
Committed: Wed Jun 17 18:39:03 2015 -0400

----------------------------------------------------------------------
 proton-c/src/transport/transport.c     |  13 +-
 tests/python/proton_tests/transport.py | 190 ++++++++++++++++++++++++++--
 2 files changed, 191 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3aab9a07/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c
index 0e23975..733f695 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -168,6 +168,13 @@ const pn_io_layer_t pni_passthru_layer = {
     NULL
 };
 
+const pn_io_layer_t pni_header_error_layer = {
+    pn_io_layer_input_error,
+    pn_output_write_amqp_header,
+    NULL,
+    NULL
+};
+
 const pn_io_layer_t pni_error_layer = {
     pn_io_layer_input_error,
     pn_io_layer_output_error,
@@ -286,7 +293,7 @@ ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int lay
   pn_do_error(transport, "amqp:connection:framing-error",
               "%s: '%s'%s", error, quoted,
               !eos ? "" : " (connection aborted)");
-  pn_set_error_layer(transport);
+  transport->io_layers[layer] = &pni_header_error_layer;
   return 0;
 }
 
@@ -2397,7 +2404,9 @@ static ssize_t pn_output_write_amqp_header(pn_transport_t* transport, unsigned i
     pn_transport_logf(transport, "  -> %s", "AMQP");
   assert(available >= 8);
   memmove(bytes, AMQP_HEADER, 8);
-  if (transport->io_layers[layer] == &amqp_write_header_layer) {
+  if (transport->io_layers[layer] == &pni_header_error_layer) {
+    transport->io_layers[layer] = &pni_error_layer;
+  }else if (transport->io_layers[layer] == &amqp_write_header_layer) {
     transport->io_layers[layer] = &amqp_layer;
   } else {
     transport->io_layers[layer] = &amqp_read_header_layer;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3aab9a07/tests/python/proton_tests/transport.py
----------------------------------------------------------------------
diff --git a/tests/python/proton_tests/transport.py b/tests/python/proton_tests/transport.py
index e56b122..958abf7 100644
--- a/tests/python/proton_tests/transport.py
+++ b/tests/python/proton_tests/transport.py
@@ -24,7 +24,7 @@ from proton import *
 class Test(common.Test):
   pass
 
-class TransportTest(Test):
+class ClientTransportTest(Test):
 
   def setup(self):
     self.transport = Transport()
@@ -87,6 +87,16 @@ class TransportTest(Test):
     self.transport.close_tail()
     self.assert_error(u'amqp:connection:framing-error')
 
+  def testProtocolNotSupported(self):
+    self.transport.push("AMQP\x01\x01\x0a\x00")
+    p = self.transport.pending()
+    assert p >= 8, p
+    bytes = self.transport.peek(p)
+    assert bytes[:8] == "AMQP\x00\x01\x00\x00"
+    self.transport.pop(p)
+    self.drain()
+    assert self.transport.closed
+
   def testPeek(self):
     out = self.transport.peek(1024)
     assert out is not None
@@ -163,30 +173,190 @@ class TransportTest(Test):
     self.peer.push(dat2[len(dat1):])
     self.peer.push(dat3)
 
+class ServerTransportTest(Test):
+
+  def setup(self):
+    self.transport = Transport(Transport.SERVER)
+    self.peer = Transport()
+    self.conn = Connection()
+    self.peer.bind(self.conn)
+
+  def teardown(self):
+    self.transport = None
+    self.peer = None
+    self.conn = None
+
+  def drain(self):
+    while True:
+      p = self.transport.pending()
+      if p < 0:
+        return
+      elif p > 0:
+        bytes = self.transport.peek(p)
+        self.peer.push(bytes)
+        self.transport.pop(len(bytes))
+      else:
+        assert False
+
+  def assert_error(self, name):
+    assert self.conn.remote_container is None, self.conn.remote_container
+    self.drain()
+    # verify that we received an open frame
+    assert self.conn.remote_container is not None, self.conn.remote_container
+    # verify that we received a close frame
+    assert self.conn.state == Endpoint.LOCAL_UNINIT | Endpoint.REMOTE_CLOSED, self.conn.state
+    # verify that a framing error was reported
+    assert self.conn.remote_condition.name == name, self.conn.remote_condition
+
+  # TODO: This may no longer be testing anything
+  def testEOS(self):
+    self.transport.push("") # should be a noop
+    self.transport.close_tail()
+    p = self.transport.pending()
+    self.drain()
+    assert self.transport.closed
+
+  def testPartial(self):
+    self.transport.push("AMQ") # partial header
+    self.transport.close_tail()
+    p = self.transport.pending()
+    assert p >= 8, p
+    bytes = self.transport.peek(p)
+    assert bytes[:8] == "AMQP\x00\x01\x00\x00"
+    self.transport.pop(p)
+    self.drain()
+    assert self.transport.closed
+
+  def testGarbage(self, garbage="GARBAGE_"):
+    self.transport.push(garbage)
+    p = self.transport.pending()
+    assert p >= 8, p
+    bytes = self.transport.peek(p)
+    assert bytes[:8] == "AMQP\x00\x01\x00\x00"
+    self.transport.pop(p)
+    self.drain()
+    assert self.transport.closed
+
+  def testSmallGarbage(self):
+    self.testGarbage("XXX")
+
+  def testBigGarbage(self):
+    self.testGarbage("GARBAGE_XXX")
+
+  def testHeader(self):
+    self.transport.push("AMQP\x00\x01\x00\x00")
+    self.transport.close_tail()
+    self.assert_error(u'amqp:connection:framing-error')
+
+  def testProtocolNotSupported(self):
+    self.transport.push("AMQP\x01\x01\x0a\x00")
+    p = self.transport.pending()
+    assert p >= 8, p
+    bytes = self.transport.peek(p)
+    assert bytes[:8] == "AMQP\x00\x01\x00\x00"
+    self.transport.pop(p)
+    self.drain()
+    assert self.transport.closed
+
+  def testPeek(self):
+    out = self.transport.peek(1024)
+    assert out is not None
+
+  def testBindAfterOpen(self):
+    conn = Connection()
+    ssn = conn.session()
+    conn.open()
+    ssn.open()
+    conn.container = "test-container"
+    conn.hostname = "test-hostname"
+    trn = Transport()
+    trn.bind(conn)
+    out = trn.peek(1024)
+    assert "test-container" in out, repr(out)
+    assert "test-hostname" in out, repr(out)
+    self.transport.push(out)
+
+    c = Connection()
+    assert c.remote_container == None
+    assert c.remote_hostname == None
+    assert c.session_head(0) == None
+    self.transport.bind(c)
+    assert c.remote_container == "test-container"
+    assert c.remote_hostname == "test-hostname"
+    assert c.session_head(0) != None
+
+  def testCloseHead(self):
+    n = self.transport.pending()
+    assert n >= 0, n
+    try:
+      self.transport.close_head()
+    except TransportException, e:
+      assert "aborted" in str(e), str(e)
+    n = self.transport.pending()
+    assert n < 0, n
+
+  def testCloseTail(self):
+    n = self.transport.capacity()
+    assert n > 0, n
+    try:
+      self.transport.close_tail()
+    except TransportException, e:
+      assert "aborted" in str(e), str(e)
+    n = self.transport.capacity()
+    assert n < 0, n
+
+  def testUnpairedPop(self):
+    conn = Connection()
+    self.transport.bind(conn)
+
+    conn.hostname = "hostname"
+    conn.open()
+
+    dat1 = self.transport.peek(1024)
+
+    ssn = conn.session()
+    ssn.open()
+
+    dat2 = self.transport.peek(1024)
+
+    assert dat2[:len(dat1)] == dat1
+
+    snd = ssn.sender("sender")
+    snd.open()
+
+    self.transport.pop(len(dat1))
+    self.transport.pop(len(dat2) - len(dat1))
+    dat3 = self.transport.peek(1024)
+    self.transport.pop(len(dat3))
+    assert self.transport.peek(1024) == ""
+
+    self.peer.push(dat1)
+    self.peer.push(dat2[len(dat1):])
+    self.peer.push(dat3)
+
   def testEOSAfterSASL(self):
-    srv = Transport(mode=Transport.SERVER)
-    srv.sasl().allowed_mechs('ANONYMOUS')
+    self.transport.sasl().allowed_mechs('ANONYMOUS')
 
     self.peer.sasl().allowed_mechs('ANONYMOUS')
 
     # this should send over the sasl header plus a sasl-init set up
     # for anonymous
     p = self.peer.pending()
-    srv.push(self.peer.peek(p))
+    self.transport.push(self.peer.peek(p))
     self.peer.pop(p)
 
     # now we send EOS
-    srv.close_tail()
+    self.transport.close_tail()
 
     # the server may send an error back
-    p = srv.pending()
+    p = self.transport.pending()
     while p>0:
-      self.peer.push(srv.peek(p))
-      srv.pop(p)
-      p = srv.pending()
+      self.peer.push(self.transport.peek(p))
+      self.transport.pop(p)
+      p = self.transport.pending()
 
     # server closed
-    assert srv.pending() < 0
+    assert self.transport.pending() < 0
 
 class LogTest(Test):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org