You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/07/05 11:47:08 UTC
svn commit: r553441 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/
qpid/client/ qpid/framing/ tests/
Author: gsim
Date: Thu Jul 5 02:47:07 2007
New Revision: 553441
URL: http://svn.apache.org/viewvc?view=rev&rev=553441
Log:
Fix for QPID-534. Get now detects closure correctly. Also fixed broker to allow channel.close-ok (and fixed client to send it).
Added:
incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Thu Jul 5 02:47:07 2007
@@ -401,9 +401,11 @@
{
try{
if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
+ if (!method->isA<ChannelCloseOkBody>()) {
+ std::stringstream out;
+ out << "Attempt to use unopened channel: " << getId();
+ throw ConnectionException(504, out.str());
+ }
} else {
method->invoke(*adapter, context);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.cpp Thu Jul 5 02:47:07 2007
@@ -100,34 +100,32 @@
c = i->second;
consumers.erase(i);
}
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ }
channel.sendAndReceiveSync<BasicCancelOkBody>(
synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
}
void BasicMessageChannel::close(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
destGet.shutdown();
destDispatch.shutdown();
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
+}
+
+void BasicMessageChannel::cancelAll(){
+ Mutex::ScopedLock l(lock);
+ for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++)
{
Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
+ if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
{
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
}
+ channel.send(new BasicCancelBody(channel.version, i->first, true));
}
+ consumers.clear();
}
-
bool BasicMessageChannel::get(
Message& msg, const Queue& queue, AckMode ackMode)
{
@@ -324,6 +322,7 @@
// Orderly shutdown.
}
catch (const Exception& e) {
+ std::cout << "Error caught by dispatch thread: " << e.what() << std::endl;
// FIXME aconway 2007-02-20: Report exception to user.
QPID_LOG(error, e.what());
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/BasicMessageChannel.h Thu Jul 5 02:47:07 2007
@@ -61,6 +61,8 @@
void close();
+ void cancelAll();
+
private:
struct Consumer{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.cpp Thu Jul 5 02:47:07 2007
@@ -40,7 +40,7 @@
using namespace qpid::sys;
Channel::Channel(bool _transactional, u_int16_t _prefetch, InteropMode mode) :
- connection(0), prefetch(_prefetch), transactional(_transactional)
+ connection(0), prefetch(_prefetch), transactional(_transactional), errorCode(200), errorText("Ok"), running(false)
{
switch (mode) {
case AMQP_08: messaging.reset(new BasicMessageChannel(*this)); break;
@@ -50,7 +50,8 @@
}
Channel::~Channel(){
- close();
+ closeInternal();
+ stop();
}
void Channel::open(ChannelId id, Connection& con)
@@ -119,7 +120,10 @@
}
}
-bool Channel::isOpen() const { return connection; }
+bool Channel::isOpen() const {
+ Mutex::ScopedLock l(lock);
+ return connection;
+}
void Channel::setQos() {
messaging->setQos();
@@ -187,7 +191,7 @@
}
void Channel::handleMethodInContext(
- AMQMethodBody::shared_ptr method, const MethodContext&)
+AMQMethodBody::shared_ptr method, const MethodContext& ctxt)
{
// Special case for consume OK as it is both an expected response
// and needs handling in this thread.
@@ -204,7 +208,7 @@
switch (method->amqpClassId()) {
case MessageOkBody::CLASS_ID:
case BasicGetOkBody::CLASS_ID: messaging->handle(method); break;
- case ChannelCloseBody::CLASS_ID: handleChannel(method); break;
+ case ChannelCloseBody::CLASS_ID: handleChannel(method, ctxt); break;
case ConnectionCloseBody::CLASS_ID: handleConnection(method); break;
default: throw UnknownMethod();
}
@@ -216,9 +220,10 @@
}
}
-void Channel::handleChannel(AMQMethodBody::shared_ptr method) {
+void Channel::handleChannel(AMQMethodBody::shared_ptr method, const MethodContext& ctxt) {
switch (method->amqpMethodId()) {
case ChannelCloseBody::METHOD_ID:
+ send(new ChannelCloseOkBody(version, ctxt.getRequestId()));
peerClose(shared_polymorphic_downcast<ChannelCloseBody>(method));
return;
case ChannelFlowBody::METHOD_ID:
@@ -249,6 +254,7 @@
}
void Channel::start(){
+ running = true;
dispatcher = Thread(*messaging);
}
@@ -260,6 +266,8 @@
if (isOpen()) {
try {
if (getId() != 0) {
+ if (code == 200) messaging->cancelAll();
+
sendAndReceive<ChannelCloseOkBody>(
make_shared_ptr(new ChannelCloseBody(
version, code, text, classId, methodId)));
@@ -272,23 +280,35 @@
throw;
}
}
+ stop();
}
// Channel closed by peer.
-void Channel::peerClose(ChannelCloseBody::shared_ptr) {
+void Channel::peerClose(ChannelCloseBody::shared_ptr reason) {
assert(isOpen());
+ //record reason:
+ errorCode = reason->getReplyCode();
+ errorText = reason->getReplyText();
closeInternal();
}
void Channel::closeInternal() {
- if (isOpen());
+ Mutex::ScopedLock l(lock);
+ if (connection);
{
- messaging->close();
connection = 0;
+ messaging->close();
// A 0 response means we are closed.
responses.signalResponse(AMQMethodBody::shared_ptr());
}
- dispatcher.join();
+}
+
+void Channel::stop() {
+ Mutex::ScopedLock l(stopLock);
+ if(running) {
+ dispatcher.join();
+ running = false;
+ }
}
AMQMethodBody::shared_ptr Channel::sendAndReceive(
@@ -321,7 +341,11 @@
}
bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
- return messaging->get(msg, queue, ackMode);
+ bool result = messaging->get(msg, queue, ackMode);
+ if (!isOpen()) {
+ throw ChannelException(errorCode, errorText);
+ }
+ return result;
}
void Channel::publish(const Message& msg, const Exchange& exchange,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientChannel.h Thu Jul 5 02:47:07 2007
@@ -27,6 +27,7 @@
#include "ClientMessage.h"
#include "ClientQueue.h"
#include "ResponseHandler.h"
+#include "qpid/Exception.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/sys/Thread.h"
#include "AckMode.h"
@@ -58,7 +59,7 @@
struct UnknownMethod {};
typedef shared_ptr<framing::AMQMethodBody> MethodPtr;
- sys::Mutex lock;
+ mutable sys::Mutex lock;
boost::scoped_ptr<MessageChannel> messaging;
Connection* connection;
sys::Thread dispatcher;
@@ -68,12 +69,20 @@
const bool transactional;
framing::ProtocolVersion version;
+ uint16_t errorCode;
+ std::string errorText;
+
+ sys::Mutex stopLock;
+ bool running;
+
+ void stop();
+
void handleHeader(framing::AMQHeaderBody::shared_ptr body);
void handleContent(framing::AMQContentBody::shared_ptr body);
void handleHeartbeat(framing::AMQHeartbeatBody::shared_ptr body);
void handleMethodInContext(
framing::AMQMethodBody::shared_ptr, const framing::MethodContext&);
- void handleChannel(framing::AMQMethodBody::shared_ptr method);
+ void handleChannel(framing::AMQMethodBody::shared_ptr method, const framing::MethodContext& ctxt);
void handleConnection(framing::AMQMethodBody::shared_ptr method);
void setQos();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ClientConnection.cpp Thu Jul 5 02:47:07 2007
@@ -25,6 +25,8 @@
#include "Connection.h"
#include "ClientChannel.h"
#include "ClientMessage.h"
+#include "qpid/log/Logger.h"
+#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
#include "qpid/QpidError.h"
#include <iostream>
@@ -49,6 +51,9 @@
isOpen(false), debug(_debug)
{
setConnector(defaultConnector);
+ qpid::log::Options o;
+ o.trace = debug;
+ qpid::log::Logger::instance().configure(o, "qpid-c++-client");
}
Connection::~Connection(){}
@@ -143,6 +148,7 @@
try{
channel->getHandlers().in->handle(frame);
}catch(const qpid::QpidError& e){
+ std::cout << "Caught error while handling " << frame << ": " << e.what() <<std::endl;
channelException(
*channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageChannel.h Thu Jul 5 02:47:07 2007
@@ -83,8 +83,11 @@
/** Send channel's QOS settings */
virtual void setQos() = 0;
- /** Channel is closing */
+ /** Channel has closed */
virtual void close() = 0;
+
+ /** Cancel all consumers */
+ virtual void cancelAll() = 0;
};
}} // namespace qpid::client
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.cpp Thu Jul 5 02:47:07 2007
@@ -109,6 +109,8 @@
// incoming.shutdown();
}
+void MessageMessageChannel::cancelAll(){
+}
/** Destination ID for the current get.
* Must not clash with a generated consumer ID.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/MessageMessageChannel.h Thu Jul 5 02:47:07 2007
@@ -62,6 +62,8 @@
void close();
+ void cancelAll();
+
private:
typedef boost::ptr_map<std::string, IncomingMessage::WaitableDestination>
Destinations;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/ChannelAdapter.h Thu Jul 5 02:47:07 2007
@@ -58,6 +58,7 @@
*@param output Processed frames are forwarded to this handler.
*/
ChannelAdapter() : id(0) {}
+ virtual ~ChannelAdapter() {}
/** Initialize the channel adapter. */
void init(ChannelId, OutputHandler&, ProtocolVersion);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=553441&r1=553440&r2=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Thu Jul 5 02:47:07 2007
@@ -95,6 +95,7 @@
testprogs= \
client_test \
+ exception_test \
echo_service \
topic_listener \
topic_publisher
@@ -103,7 +104,7 @@
TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) $(srcdir)/run_test
-system_tests = client_test quick_topictest
+system_tests = client_test exception_test quick_topictest
TESTS += run-unit-tests start_broker $(system_tests) python_tests kill_broker
EXTRA_DIST += \
Added: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?view=auto&rev=553441
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Thu Jul 5 02:47:07 2007
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+
+#include "TestOptions.h"
+#include "qpid/QpidError.h"
+#include "qpid/client/ClientChannel.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ClientMessage.h"
+
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+int main(int argc, char** argv)
+{
+ qpid::TestOptions opts;
+ opts.parse(argc, argv);
+
+ try {
+ Connection con(opts.trace);
+ con.open(opts.host, opts.port, opts.username, opts.password, opts.virtualhost);
+
+ Queue queue("I don't exist!");
+ Channel channel;
+ con.openChannel(channel);
+ channel.start();
+ //test handling of get (which is a bit odd)
+ try {
+ Message msg;
+ if (channel.get(msg, queue)) {
+ std::cout << "Received " << msg.getData() << " from " << queue.getName() << std::endl;
+ } else {
+ std::cout << "Queue " << queue.getName() << " was empty." << std::endl;
+ }
+ con.close();
+ return 1;
+ } catch (const qpid::ChannelException& e) {
+ std::cout << "get failed as expected: " << e.what() << std::endl;
+ }
+
+ con.close();
+ return 0;
+ } catch(const std::exception& e) {
+ std::cout << "got unexpected exception: " << e.what() << std::endl;
+ return 1;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date