You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2007/09/12 02:37:19 UTC

svn commit: r574735 - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/RecoveryManagerImpl.cpp cpp/src/qpid/framing/AMQFrame.cpp cpp/src/qpid/framing/AMQFrame.h python/qpid/codec.py python/qpid/connection.py

Author: astitcher
Date: Tue Sep 11 17:37:17 2007
New Revision: 574735

URL: http://svn.apache.org/viewvc?rev=574735&view=rev
Log:
*    python/qpid/codec.py
  Comment typo
*    cpp/src/qpid/broker/RecoveryManagerImpl.cpp
  Cruft removal
*    python/qpid/codec.py
*    python/qpid/connection.py
*    cpp/src/qpid/framing/AMQFrame.h
*    cpp/src/qpid/framing/AMQFrame.cpp
  Initial implementation of 0-10 framing -
  This uses the new 12 byte frame header, but doesn't
  support splitting segments/framesets over multiple
  frames yet.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
    incubator/qpid/trunk/qpid/python/qpid/codec.py
    incubator/qpid/trunk/qpid/python/qpid/connection.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=574735&r1=574734&r2=574735&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue Sep 11 17:37:17 2007
@@ -107,8 +107,6 @@
 
 RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
 {
-    buffer.record();
-    //peek at type:
     Message::shared_ptr message(new Message());
     message->decodeHeader(buffer);
     return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold));    

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp?rev=574735&r1=574734&r2=574735&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.cpp Tue Sep 11 17:37:17 2007
@@ -70,40 +70,70 @@
     return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
 }
 
+// This is now misleadingly named as it is not the frame size as defined in the spec 
+// (as it also includes the end marker)
+uint32_t AMQFrame::size() const{
+    return frameOverhead() + boost::apply_visitor(SizeVisitor(), body);
+}
+
+uint32_t AMQFrame::frameOverhead() {
+    return 12 /*frame header*/ + 1/*0xCE*/;
+}
+
 void AMQFrame::encode(Buffer& buffer) const
 {
+    uint8_t flags = (bof ? 0x08 : 0) | (eof ? 0x04 : 0) | (bos ? 0x02 : 0) | (eos ? 0x01 : 0);
+    buffer.putOctet(flags);
     buffer.putOctet(getBody()->type());
+    buffer.putShort(size() - 1); // Don't include end marker (it's not part of the frame itself)
+    buffer.putOctet(0);
+    buffer.putOctet(0x0f & subchannel);
     buffer.putShort(channel);    
-    buffer.putLong(boost::apply_visitor(SizeVisitor(), body));
+    buffer.putLong(0);
     boost::apply_visitor(EncodeVisitor(buffer), body);
     buffer.putOctet(0xCE);
 }
 
-uint32_t AMQFrame::size() const{
-    return frameOverhead() + boost::apply_visitor(SizeVisitor(), body);
-}
-
-uint32_t AMQFrame::frameOverhead() {
-    return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/;
-}
-
 bool AMQFrame::decode(Buffer& buffer)
 {    
-    if(buffer.available() < 7)
+    if(buffer.available() < frameOverhead() - 1)
         return false;
     buffer.record();
 
-    uint8_t type = buffer.getOctet();
+    uint8_t  flags = buffer.getOctet();
+    uint8_t framing_version = (flags & 0xc0) >> 6;
+    if (framing_version != 0)
+        THROW_QPID_ERROR(FRAMING_ERROR, "Framing version unsupported");
+    bof = flags & 0x08;
+    eof = flags & 0x04;
+    bos = flags & 0x02;
+    bos = flags & 0x01;
+    uint8_t  type = buffer.getOctet();
+    uint16_t frame_size =  buffer.getShort();
+    if (frame_size < frameOverhead()-1)
+        THROW_QPID_ERROR(FRAMING_ERROR, "Frame size too small");    
+    uint8_t  reserved1 = buffer.getOctet();
+    uint8_t  field1 = buffer.getOctet();
+    subchannel = field1 & 0x0f;
     channel = buffer.getShort();
-    uint32_t size =  buffer.getLong();
+    (void) buffer.getLong(); // reserved2
+    
+    // Verify that the protocol header meets current spec
+    // TODO: should we check reserved2 against zero as well? - the spec isn't clear
+    if ((flags & 0x30) != 0 || reserved1 != 0 || (field1 & 0xf0) != 0)
+        THROW_QPID_ERROR(FRAMING_ERROR, "Reserved bits not zero");
 
-    if(buffer.available() < size+1){
+    // TODO: should no longer care about body size and only pass up B,E,b,e flags
+    uint16_t body_size = frame_size + 1 - frameOverhead(); 
+    if (buffer.available() < body_size+1u){
         buffer.restore();
         return false;
     }
-    decodeBody(buffer, size, type);
+    decodeBody(buffer, body_size, type);
+
     uint8_t end = buffer.getOctet();
-    if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
+    if (end != 0xCE)
+        THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found");
     return true;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h?rev=574735&r1=574734&r2=574735&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/AMQFrame.h Tue Sep 11 17:37:17 2007
@@ -37,14 +37,14 @@
 class AMQFrame : public AMQDataBlock
 {
   public:
-    AMQFrame() : channel(0) {}
+    AMQFrame() : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(0) {}
 
     /** Construct a frame with a copy of b */
-    AMQFrame(ChannelId c, const AMQBody* b) : channel(c) {
+    AMQFrame(ChannelId c, const AMQBody* b) : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(c) {
         setBody(*b);
     }
     
-    AMQFrame(ChannelId c, const AMQBody& b) : channel(c) {
+    AMQFrame(ChannelId c, const AMQBody& b) : bof(true), eof(true), bos(true), eos(true), subchannel(0), channel(c) {
         setBody(b);
     }
     
@@ -97,6 +97,11 @@
 
     void decodeBody(Buffer& buffer, uint32_t size, uint8_t type);
 
+    bool bof;
+    bool eof;
+    bool bos;
+    bool eos;
+    uint8_t subchannel;
     uint16_t channel;
     Variant body;
 };

Modified: incubator/qpid/trunk/qpid/python/qpid/codec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/codec.py?rev=574735&r1=574734&r2=574735&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/codec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/codec.py Tue Sep 11 17:37:17 2007
@@ -53,7 +53,7 @@
 
   def read(self, n):
     """
-    reads in 'n' bytes from the stream. Can raise EFO exception
+    reads in 'n' bytes from the stream. Can raise EOF exception
     """
     data = self.stream.read(n)
     if n > 0 and len(data) == 0:

Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?rev=574735&r1=574734&r2=574735&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Tue Sep 11 17:37:17 2007
@@ -92,23 +92,44 @@
 
   def write(self, frame):
     c = self.codec
+    c.encode_octet(0x0f) # TODO: currently fixed at ver=0, B=E=b=e=1
     c.encode_octet(self.spec.constants.byname[frame.type].id)
-    c.encode_short(frame.channel)
     body = StringIO()
     enc = codec.Codec(body, self.spec)
     frame.encode(enc)
     enc.flush()
-    c.encode_longstr(body.getvalue())
+    frame_size = len(body.getvalue()) + 12 # TODO: Magic number (frame header size)
+    c.encode_short(frame_size)
+    c.encode_octet(0) # Reserved
+    c.encode_octet(frame.subchannel & 0x0f)
+    c.encode_short(frame.channel)
+    c.encode_long(0) # Reserved
+    c.write(body.getvalue())
     c.encode_octet(self.FRAME_END)
 
   def read(self):
     c = self.codec
+    flags = c.decode_octet() # TODO: currently ignoring flags
+    framing_version = (flags & 0xc0) >> 6
+    if framing_version != 0:
+      raise "frame error: unknown framing version"
     type = self.spec.constants.byid[c.decode_octet()].name
+    frame_size = c.decode_short()
+    if frame_size < 12: # TODO: Magic number (frame header size)
+      raise "frame error: frame size too small"
+    reserved1 = c.decode_octet()
+    field = c.decode_octet()
+    subchannel = field & 0x0f
     channel = c.decode_short()
-    body = c.decode_longstr()
+    reserved2 = c.decode_long() # TODO: reserved maybe need to ensure 0
+    if (flags & 0x30) != 0 or reserved1 != 0 or (field & 0xf0) != 0:
+      raise "frame error: reserved bits not all zero"
+    body_size = frame_size - 12 # TODO: Magic number (frame header size)
+    body = c.read(body_size)
     dec = codec.Codec(StringIO(body), self.spec)
     frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
     frame.channel = channel
+    frame.subchannel = subchannel
     end = c.decode_octet()
     if end != self.FRAME_END:
       garbage = ""
@@ -145,6 +166,7 @@
 
   def init(self, args, kwargs):
     self.channel = kwargs.pop("channel", 0)
+    self.subchannel = kwargs.pop("subchannel", 0)
 
   def encode(self, enc): abstract