You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/07/05 11:47:08 UTC

svn commit: r553441 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/client/ qpid/framing/ tests/

Author: gsim
Date: Thu Jul  5 02:47:07 2007
New Revision: 553441

URL: http://svn.apache.org/viewvc?view=rev&rev=553441
Log:
Fix for QPID-534. Get now detects closure correctly. Also fixed broker to allow channel.close-ok (and fixed client to send it).


Added:
    incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Thu Jul  5 02:47:07 2007
@@ -401,9 +401,11 @@
 {
     try{
         if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
-            std::stringstream out;
-            out << "Attempt to use unopened channel: " << getId();
-            throw ConnectionException(504, out.str());
+            if (!method->isA<ChannelCloseOkBody>()) {
+                std::stringstream out;
+                out << "Attempt to use unopened channel: " << getId();
+                throw ConnectionException(504, out.str());
+            }
         } else {
             method->invoke(*adapter, context);
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp Thu Jul  5 02:47:07 2007
@@ -100,34 +100,32 @@
         c = i->second;
         consumers.erase(i);
     }
-    if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) 
+    if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
         channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+    }
     channel.sendAndReceiveSync<BasicCancelOkBody>(
         synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
 }
 
 void BasicMessageChannel::close(){
-    ConsumerMap consumersCopy;
-    {
-        Mutex::ScopedLock l(lock);
-        consumersCopy = consumers;
-        consumers.clear();
-    }
     destGet.shutdown();
     destDispatch.shutdown();
-    for (ConsumerMap::iterator i=consumersCopy.begin();
-         i  != consumersCopy.end(); ++i)
+}
+
+void BasicMessageChannel::cancelAll(){
+    Mutex::ScopedLock l(lock);
+    for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++)
     {
         Consumer& c = i->second;
-        if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
-            && c.lastDeliveryTag > 0)
+        if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
         {
             channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
         }
+        channel.send(new BasicCancelBody(channel.version, i->first, true));
     }
+    consumers.clear();
 }
 
-
 bool BasicMessageChannel::get(
     Message& msg, const Queue& queue, AckMode ackMode)
 {
@@ -324,6 +322,7 @@
             // Orderly shutdown.
         }
         catch (const Exception& e) {
+            std::cout << "Error caught by dispatch thread: " << e.what() << std::endl;
             // FIXME aconway 2007-02-20: Report exception to user.
             QPID_LOG(error,  e.what());
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h Thu Jul  5 02:47:07 2007
@@ -61,6 +61,8 @@
     
     void close();
 
+    void cancelAll();
+
   private:
 
     struct Consumer{

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Thu Jul  5 02:47:07 2007
@@ -40,7 +40,7 @@
 using namespace qpid::sys;
 
 Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
-    connection(0), prefetch(_prefetch), transactional(_transactional)
+    connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
 {
     switch (mode) {
       case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
@@ -50,7 +50,8 @@
 }
 
 Channel::~Channel(){
-    close();
+    closeInternal();
+    stop();
 }
 
 void Channel::open(ChannelId id, Connection& con)
@@ -119,7 +120,10 @@
     }
 }
     
-bool Channel::isOpen() const { return connection; }
+bool Channel::isOpen() const { 
+    Mutex::ScopedLock l(lock);
+    return connection; 
+}
 
 void Channel::setQos() {
     messaging->setQos();
@@ -187,7 +191,7 @@
 }
 
 void Channel::handleMethodInContext(
-    AMQMethodBody::shared_ptr method, const MethodContext&)
+AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
 {
     // Special case for consume OK as it is both an expected response
     // and needs handling in this thread.
@@ -204,7 +208,7 @@
         switch (method->amqpClassId()) {
           case MessageOkBody::CLASS_ID: 
           case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
-          case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
+          case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
           case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
           default: throw UnknownMethod();
         }
@@ -216,9 +220,10 @@
             }
         }
 
-void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
+void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
     switch (method->amqpMethodId()) {
       case ChannelCloseBody::METHOD_ID:
+          send(new ChannelCloseOkBody(version, ctxt.getRequestId()));
         peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
         return;
       case ChannelFlowBody::METHOD_ID:
@@ -249,6 +254,7 @@
 }
 
 void Channel::start(){
+    running = true;
     dispatcher = Thread(*messaging);
 }
 
@@ -260,6 +266,8 @@
     if (isOpen()) {
         try {
             if (getId() != 0) {
+                if (code == 200) messaging->cancelAll();
+
                 sendAndReceive<ChannelCloseOkBody>(
                     make_shared_ptr(new ChannelCloseBody(
                                         version, code, text, classId, methodId)));
@@ -272,23 +280,35 @@
             throw;
         }
     }
+    stop();
 }
 
 // Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+void Channel::peerClose(ChannelCloseBody::shared_ptr reason) {
     assert(isOpen());
+    //record reason:
+    errorCode = reason->getReplyCode();
+    errorText = reason->getReplyText();
     closeInternal();
 }
 
 void Channel::closeInternal() {
-    if (isOpen());
+    Mutex::ScopedLock l(lock);
+    if (connection);
     {
-        messaging->close();
         connection = 0;
+        messaging->close();
         // A 0 response means we are closed.
         responses.signalResponse(AMQMethodBody::shared_ptr());
     }
-    dispatcher.join();        
+}
+
+void Channel::stop() {
+    Mutex::ScopedLock l(stopLock);
+    if(running) {
+        dispatcher.join();
+        running = false;
+    }
 }
 
 AMQMethodBody::shared_ptr Channel::sendAndReceive(
@@ -321,7 +341,11 @@
 }
 
 bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
-    return messaging->get(msg, queue, ackMode);
+    bool result = messaging->get(msg, queue, ackMode);
+    if (!isOpen()) {
+        throw ChannelException(errorCode, errorText);
+    }
+    return result;
 }
 
 void Channel::publish(const Message& msg, const Exchange& exchange,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Thu Jul  5 02:47:07 2007
@@ -27,6 +27,7 @@
 #include "ClientMessage.h"
 #include "ClientQueue.h"
 #include "ResponseHandler.h"
+#include "qpid/Exception.h"
 #include "qpid/framing/ChannelAdapter.h"
 #include "qpid/sys/Thread.h"
 #include "AckMode.h"
@@ -58,7 +59,7 @@
     struct UnknownMethod {};
     typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
         
-    sys::Mutex lock;
+    mutable sys::Mutex lock;
     boost::scoped_ptr<MessageChannel> messaging;
     Connection* connection;
     sys::Thread dispatcher;
@@ -68,12 +69,20 @@
     const bool transactional;
     framing::ProtocolVersion version;
 
+    uint16_t errorCode;
+    std::string errorText;
+
+    sys::Mutex stopLock;
+    bool running;
+
+    void stop();
+
     void handleHeader(framing::AMQHeaderBody::shared_ptr body);
     void handleContent(framing::AMQContentBody::shared_ptr body);
     void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
     void handleMethodInContext(
         framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
-    void handleChannel(framing::AMQMethodBody::shared_ptr method);
+    void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
     void handleConnection(framing::AMQMethodBody::shared_ptr method);
 
     void setQos();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp Thu Jul  5 02:47:07 2007
@@ -25,6 +25,8 @@
 #include "Connection.h"
 #include "ClientChannel.h"
 #include "ClientMessage.h"
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
 #include "qpid/log/Statement.h"
 #include "qpid/QpidError.h"
 #include <iostream>
@@ -49,6 +51,9 @@
     isOpen(false), debug(_debug)
 {
     setConnector(defaultConnector);
+    qpid::log::Options o;
+    o.trace = debug;
+    qpid::log::Logger::instance().configure(o, "qpid-c++-client");
 }
 
 Connection::~Connection(){}
@@ -143,6 +148,7 @@
     try{
         channel->getHandlers().in->handle(frame);
     }catch(const qpid::QpidError& e){
+        std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl;
         channelException(
             *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h Thu Jul  5 02:47:07 2007
@@ -83,8 +83,11 @@
     /** Send channel's QOS settings */
     virtual void setQos() = 0;
 
-    /** Channel is closing */
+    /** Channel has closed */
     virtual void close() = 0;
+
+    /** Cancel all consumers */
+    virtual void cancelAll() = 0;
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp Thu Jul  5 02:47:07 2007
@@ -109,6 +109,8 @@
 //     incoming.shutdown();
 }
 
+void MessageMessageChannel::cancelAll(){
+}
 
 /** Destination ID for the current get.
  * Must not clash with a generated consumer ID.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h Thu Jul  5 02:47:07 2007
@@ -62,6 +62,8 @@
     
     void close();
 
+    void cancelAll();
+
   private:
     typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
     Destinations;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h Thu Jul  5 02:47:07 2007
@@ -58,6 +58,7 @@
      *@param output Processed frames are forwarded to this handler.
      */
     ChannelAdapter() : id(0) {}
+    virtual ~ChannelAdapter() {}
 
     /** Initialize the channel adapter. */
     void init(ChannelId, OutputHandler&, ProtocolVersion);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Jul  5 02:47:07 2007
@@ -95,6 +95,7 @@
 
 testprogs=		\
   client_test		\
+  exception_test	\
   echo_service		\
   topic_listener	\
   topic_publisher	
@@ -103,7 +104,7 @@
 
 TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
 
-system_tests = client_test quick_topictest
+system_tests = client_test exception_test quick_topictest
 TESTS += run-unit-tests start_broker $(system_tests) python_tests kill_broker 
 
 EXTRA_DIST +=								\

Added: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?view=auto&rev=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Thu Jul  5 02:47:07 2007
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+
+#include "TestOptions.h"
+#include "qpid/QpidError.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ClientMessage.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+int main(int argc, char** argv)
+{
+    qpid::TestOptions opts;
+    opts.parse(argc, argv);
+
+    try {
+	Connection con(opts.trace);
+	con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost);
+
+        Queue queue("I don't exist!");
+        Channel channel;      
+        con.openChannel(channel);
+        channel.start();
+        //test handling of get (which is a bit odd)
+        try {
+            Message msg;
+            if (channel.get(msg, queue)) {
+                std::cout << "Received " << msg.getData() << " from " << queue.getName() << std::endl;
+            } else {
+                std::cout << "Queue " << queue.getName() << " was empty." << std::endl;
+            }
+            con.close();
+            return 1;
+        } catch (const qpid::ChannelException& e) {
+            std::cout << "get failed as expected: " << e.what() << std::endl;
+        }
+
+        con.close();
+        return 0;
+    } catch(const std::exception& e) {
+	std::cout << "got unexpected exception: " << e.what() << std::endl;
+        return 1;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date