You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/01/14 19:30:26 UTC

svn commit: r496110 - in /incubator/qpid/branches/qpid.0-9/cpp: lib/broker/ lib/common/ lib/common/framing/ tests/

Author: aconway
Date: Sun Jan 14 10:30:25 2007
New Revision: 496110

URL: http://svn.apache.org/viewvc?view=rev&rev=496110
Log:

* Added Requester/Responder classes to manage request-ids, response-ids,
  and response-mark. Response batches not yet supported.

Added:
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp   (with props)
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
    incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
    incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp?view=diff&rev=496110&r1=496109&r2=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/broker/SessionHandlerImpl.cpp Sun Jan 14 10:30:25 2007
@@ -224,7 +224,6 @@
         parent->framemax, parent->queues->getStore(),
         parent->settings.stagingThreshold);
 
-    // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9
     parent->client->getChannel().openOk(channel, std::string()/* ID */);
 } 
         
@@ -279,7 +278,7 @@
     const string& /*routingKey*/,
     const qpid::framing::FieldTable& /*arguments*/ )
 {
-        assert(0);                // FIXME aconway 2007-01-04: 0-9 feature
+        assert(0);            // FIXME aconway 2007-01-04: 0-9 feature
 }
 
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am?view=diff&rev=496110&r1=496109&r2=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/Makefile.am Sun Jan 14 10:30:25 2007
@@ -72,6 +72,8 @@
   $(framing)/ProtocolInitiation.cpp		\
   $(framing)/ProtocolVersion.cpp		\
   $(framing)/ProtocolVersionException.cpp	\
+  $(framing)/Requester.cpp			\
+  $(framing)/Responder.cpp			\
   $(framing)/Value.cpp				\
   $(gen)/AMQP_ClientProxy.cpp			\
   $(gen)/AMQP_HighestVersion.h			\

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h?view=diff&rev=496110&r1=496109&r2=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQRequestBody.h Sun Jan 14 10:30:25 2007
@@ -31,7 +31,17 @@
 {
  public:
     typedef boost::shared_ptr<AMQRequestBody> shared_ptr;
-    
+
+    struct Data {
+        Data(RequestId id=0, ResponseId mark=0)
+            : requestId(id), responseMark(mark) {}
+        void encode(Buffer&) const;
+        void decode(Buffer&);
+
+        RequestId requestId;
+        ResponseId responseMark;
+    };
+
     static shared_ptr create(
         AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
         Buffer& buffer);
@@ -51,16 +61,6 @@
     static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+16; }
     
   private:
-    struct Data {
-        Data(RequestId id=0, ResponseId mark=0)
-            : requestId(id), responseMark(mark) {}
-        void encode(Buffer&) const;
-        void decode(Buffer&);
-
-        RequestId requestId;
-        ResponseId responseMark;
-    };
-
     Data data;
 };
 

Modified: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h?view=diff&rev=496110&r1=496109&r2=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/AMQResponseBody.h Sun Jan 14 10:30:25 2007
@@ -35,6 +35,17 @@
   public:
     typedef boost::shared_ptr<AMQResponseBody> shared_ptr;
     
+    struct Data {
+        Data(ResponseId id=0, RequestId req=0, BatchOffset off=0)
+            : responseId(id), requestId(req), batchOffset(off) {}
+        void encode(Buffer&) const;
+        void decode(Buffer&);
+
+        u_int64_t responseId;
+        u_int64_t requestId;
+        u_int32_t batchOffset;
+    };
+
     static shared_ptr create(
         AMQP_MethodVersionMap& versionMap, ProtocolVersion version,
         Buffer& buffer);
@@ -53,17 +64,6 @@
   protected:
     static const u_int32_t baseSize() { return AMQMethodBody::baseSize()+20; }
   private:
-    struct Data {
-        Data(ResponseId id=0, RequestId req=0, BatchOffset off=0)
-            : responseId(id), requestId(req), batchOffset(off) {}
-        void encode(Buffer&) const;
-        void decode(Buffer&);
-
-        u_int64_t responseId;
-        u_int64_t requestId;
-        u_int32_t batchOffset;
-    };
-
     Data data;
 };
 

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp?view=auto&rev=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp Sun Jan 14 10:30:25 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Requester.h"
+#include "QpidError.h"
+
+namespace qpid {
+namespace framing {
+
+Requester::Requester() : lastId(0), responseMark(0) {}
+
+void Requester::sending(AMQRequestBody::Data& request) {
+    request.requestId = ++lastId;
+    request.responseMark = responseMark;
+    requests.insert(request.requestId);
+}
+
+void Requester::processed(const AMQResponseBody::Data& response) {
+    responseMark = response.responseId;
+    RequestId id = response.requestId;
+    RequestId end = id + response.batchOffset;
+    for ( ; id < end; ++id) {
+        std::set<RequestId>::iterator i = requests.find(id);
+        if (i == requests.end())
+            // TODO aconway 2007-01-12: Verify this is the right exception.
+            THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid response.");
+        requests.erase(i);
+    }
+}
+
+}} // namespace qpid::framing

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h?view=auto&rev=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h Sun Jan 14 10:30:25 2007
@@ -0,0 +1,59 @@
+#ifndef _framing_Requester_h
+#define _framing_Requester_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <set>
+#include "AMQRequestBody.h"
+#include "AMQResponseBody.h"
+
+namespace qpid {
+namespace framing {
+
+class AMQRequestBody;
+class AMQResponseBody;
+
+/**
+ * Manage request IDs and the response mark for locally initiated requests.
+ *
+ * THREAD UNSAFE: This class is called as frames are sent or received
+ * sequentially on a connection, so it does not need to be thread safe.
+ */
+class Requester
+{
+  public:
+    Requester();
+
+    /** Called before sending a request to set request data. */
+    void sending(AMQRequestBody::Data&);
+
+    /** Called after processing a response. */
+    void processed(const AMQResponseBody::Data&);
+    
+  private:
+    std::set<RequestId> requests; /** Sent but not responded to */
+    RequestId lastId;
+    ResponseId responseMark;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif  /*!_framing_Requester_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Requester.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp?view=auto&rev=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp Sun Jan 14 10:30:25 2007
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Responder.h"
+#include "QpidError.h"
+
+namespace qpid {
+namespace framing {
+
+Responder::Responder() : lastId(0), responseMark(0) {}
+
+void Responder::received(const AMQRequestBody::Data& request) {
+    if (request.responseMark < responseMark || request.responseMark > lastId)
+        THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid resposne mark");
+    responseMark = request.responseMark;
+}
+
+void Responder::sending(AMQResponseBody::Data& response, RequestId toRequest) {
+    response.responseId = ++lastId;
+    response.requestId = toRequest;
+    response.batchOffset = 0;
+}
+
+}} // namespace qpid::framing
+

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h?view=auto&rev=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h (added)
+++ incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h Sun Jan 14 10:30:25 2007
@@ -0,0 +1,61 @@
+#ifndef _framing_Responder_h
+#define _framing_Responder_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "AMQRequestBody.h"
+#include "AMQResponseBody.h"
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Manage response ids and response mark remotely initianted requests.
+ *
+ * THREAD UNSAFE: This class is called as frames are sent or received
+ * sequentially on a connection, so it does not need to be thread safe.
+ */
+class Responder
+{
+  public:
+    Responder();
+
+    /** Called after receiving a request. */
+    void received(const AMQRequestBody::Data& request);
+
+    /** Called before sending a response to set respose data.  */
+    void sending(AMQResponseBody::Data& response, RequestId toRequest);
+
+    /** Get the ID of the highest response acknowledged by the peer. */
+    ResponseId getResponseMark() { return responseMark; }
+
+    // TODO aconway 2007-01-14: Batching support - store unsent
+    // Response for equality comparison with subsequent responses.
+    // 
+
+  private:
+    ResponseId lastId;
+    ResponseId responseMark;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif  /*!_framing_Responder_h*/

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/cpp/lib/common/framing/Responder.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp?view=diff&rev=496110&r1=496109&r2=496110
==============================================================================
--- incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp (original)
+++ incubator/qpid/branches/qpid.0-9/cpp/tests/FramingTest.cpp Sun Jan 14 10:30:25 2007
@@ -28,7 +28,8 @@
 #include <AMQP_HighestVersion.h>
 #include "AMQRequestBody.h"
 #include "AMQResponseBody.h"
-
+#include "Requester.h"
+#include "Responder.h"
 
 using namespace qpid::framing;
 
@@ -52,6 +53,8 @@
     CPPUNIT_TEST(testBasicConsumeOkBodyFrame);
     CPPUNIT_TEST(testRequestBodyFrame);
     CPPUNIT_TEST(testResponseBodyFrame);
+    CPPUNIT_TEST(testRequester);
+    CPPUNIT_TEST(testResponder);
     CPPUNIT_TEST_SUITE_END();
 
   private:
@@ -170,6 +173,98 @@
         ChannelOkBody* decoded =
             dynamic_cast<ChannelOkBody*>(out.getBody().get());
         CPPUNIT_ASSERT(decoded);
+    }
+
+    void testRequester() {
+        Requester r;
+        AMQRequestBody::Data q;
+        AMQResponseBody::Data p;
+
+        r.sending(q);
+        CPPUNIT_ASSERT_EQUAL(1ULL, q.requestId);
+        CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark);
+
+        r.sending(q);
+        CPPUNIT_ASSERT_EQUAL(2ULL, q.requestId);
+        CPPUNIT_ASSERT_EQUAL(0ULL, q.responseMark);
+
+        // Now process a response
+        p.responseId = 1;
+        p.requestId = 2;
+        r.processed(AMQResponseBody::Data(1, 2));
+
+        r.sending(q);
+        CPPUNIT_ASSERT_EQUAL(3ULL, q.requestId);
+        CPPUNIT_ASSERT_EQUAL(1ULL, q.responseMark);
+        
+        try {
+            r.processed(p);     // Already processed this response.
+            CPPUNIT_FAIL("Expected exception");
+        } catch (...) {}
+
+        try {
+            p.requestId = 50;
+            r.processed(p);     // No such request
+            CPPUNIT_FAIL("Expected exception");
+        } catch (...) {}
+
+        r.sending(q);           // reqId=4
+        r.sending(q);           // reqId=5
+        r.sending(q);           // reqId=6
+        p.responseId++;
+        p.requestId = 4;
+        p.batchOffset = 2;
+        r.processed(p);
+        r.sending(q);
+        CPPUNIT_ASSERT_EQUAL(7ULL, q.requestId);
+        CPPUNIT_ASSERT_EQUAL(2ULL, q.responseMark);
+
+        p.responseId++;
+        p.requestId = 1;        // Out of order
+        p.batchOffset = 0;
+        r.processed(p);
+        r.sending(q);
+        CPPUNIT_ASSERT_EQUAL(8ULL, q.requestId);
+        CPPUNIT_ASSERT_EQUAL(3ULL, q.responseMark);
+    }
+
+    void testResponder() {
+        Responder r;
+        AMQRequestBody::Data q;
+        AMQResponseBody::Data p;
+
+        q.requestId = 1;
+        q.responseMark = 0;
+        r.received(q);
+        r.sending(p, q.requestId);
+        CPPUNIT_ASSERT_EQUAL(1ULL, p.responseId);
+        CPPUNIT_ASSERT_EQUAL(1ULL, p.requestId);
+        CPPUNIT_ASSERT_EQUAL(0U,   p.batchOffset);
+        CPPUNIT_ASSERT_EQUAL(0ULL, r.getResponseMark());
+
+        q.requestId++;
+        q.responseMark = 1;
+        r.received(q);
+        r.sending(p, q.requestId);
+        CPPUNIT_ASSERT_EQUAL(2ULL, p.responseId);
+        CPPUNIT_ASSERT_EQUAL(2ULL, p.requestId);
+        CPPUNIT_ASSERT_EQUAL(0U,   p.batchOffset);
+        CPPUNIT_ASSERT_EQUAL(1ULL, r.getResponseMark());
+
+        try {
+            // Response mark higher any request ID sent.
+            q.responseMark = 3;
+            r.received(q);
+        } catch(...) {}
+
+        try {
+            // Response mark lower than previous response mark.
+            q.responseMark = 0;
+            r.received(q);
+        } catch(...) {}
+
+        // TODO aconway 2007-01-14: Test for batching when supported.
+        
     }
 };