You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/09/27 21:50:24 UTC
svn commit: r450556 - in /incubator/qpid/trunk/qpid: cpp/ cpp/broker/src/
cpp/common/framing/inc/ cpp/common/framing/src/ python/qpid/ python/tests/
Author: aconway
Date: Wed Sep 27 12:50:23 2006
New Revision: 450556
URL: http://svn.apache.org/viewvc?view=rev&rev=450556
Log: (empty)
Added:
incubator/qpid/trunk/qpid/cpp/broker/src/HeadersExchange.cpp (with props)
Removed:
incubator/qpid/trunk/qpid/cpp/common/framing/inc/NamedValue.h
incubator/qpid/trunk/qpid/cpp/common/framing/src/NamedValue.cpp
Modified:
incubator/qpid/trunk/qpid/cpp/Makefile
incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp
incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQBody.h
incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h
incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeaderBody.h
incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeartbeatBody.h
incubator/qpid/trunk/qpid/cpp/common/framing/inc/Buffer.h
incubator/qpid/trunk/qpid/cpp/common/framing/inc/FieldTable.h
incubator/qpid/trunk/qpid/cpp/common/framing/inc/Value.h
incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp
incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQFrame.cpp
incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQHeaderBody.cpp
incubator/qpid/trunk/qpid/cpp/common/framing/src/Buffer.cpp
incubator/qpid/trunk/qpid/cpp/common/framing/src/FieldTable.cpp
incubator/qpid/trunk/qpid/cpp/common/framing/src/Value.cpp
incubator/qpid/trunk/qpid/python/qpid/connection.py
incubator/qpid/trunk/qpid/python/qpid/peer.py
incubator/qpid/trunk/qpid/python/qpid/testlib.py
incubator/qpid/trunk/qpid/python/tests/exchange.py
incubator/qpid/trunk/qpid/python/tests/testlib.py
Modified: incubator/qpid/trunk/qpid/cpp/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/Makefile?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/Makefile (original)
+++ incubator/qpid/trunk/qpid/cpp/Makefile Wed Sep 27 12:50:23 2006
@@ -22,15 +22,19 @@
UNITTESTS=$(wildcard common/*/test/*.so broker/test/*.so)
-.PHONY: all clean doxygen
+.PHONY: all test unittest pythontest runtests clean doxygen
-test: all
- @$(MAKE) runtests
+test: all runtests
-runtests:
+unittest:
DllPlugInTester -c -b $(UNITTESTS)
+
+pythontest:
bin/qpidd >> qpidd.log &
cd ../python ; ./run-tests -v -I cpp_failing.txt
+
+runtests:
+ $(MAKE) -k unittest pythontest
all:
@$(MAKE) -C common all
Added: incubator/qpid/trunk/qpid/cpp/broker/src/HeadersExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/HeadersExchange.cpp?view=auto&rev=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/HeadersExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/HeadersExchange.cpp Wed Sep 27 12:50:23 2006
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "HeadersExchange.h"
+#include "ExchangeBinding.h"
+#include "Value.h"
+#include <algorithm>
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+// TODO aconway 2006-09-20: More efficient matching algorithm.
+// The current search algorithm really sucks.
+// Fieldtables are heavy, maybe use shared_ptr to do handle-body.
+
+namespace qpid {
+namespace broker {
+
+namespace {
+const std::string all("all");
+const std::string any("any");
+const std::string x_match("x-match");
+}
+
+HeadersExchange::HeadersExchange(const string& name) : Exchange(name) { }
+
+void HeadersExchange::bind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ std::cout << "HeadersExchange::bind" << std::endl;
+ Locker locker(lock);
+ std::string what = args->getString("x-match");
+ // TODO aconway 2006-09-26: throw an exception for invalid bindings.
+ if (what != all && what != any) return; // Invalid.
+ bindings.push_back(Binding(*args, queue));
+ queue->bound(new ExchangeBinding(this, queue, routingKey, args));
+}
+
+void HeadersExchange::unbind(Queue::shared_ptr queue, const string& routingKey, FieldTable* args){
+ Locker locker(lock);;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (i->first == *args) {
+ bindings.erase(i);
+ }
+ }
+}
+
+
+void HeadersExchange::route(Message::shared_ptr& msg, const string& routingKey, FieldTable* args){
+ std::cout << "route: " << *args << std::endl;
+ Locker locker(lock);;
+ for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
+ if (match(i->first, *args)) i->second->deliver(msg);
+ }
+}
+
+HeadersExchange::~HeadersExchange() {}
+
+const std::string HeadersExchange::typeName("headers");
+namespace
+{
+
+bool match_values(const Value& bind, const Value& msg) {
+ return dynamic_cast<const EmptyValue*>(&bind) || bind == msg;
+}
+
+}
+
+
+bool HeadersExchange::match(const FieldTable& bind, const FieldTable& msg) {
+ typedef FieldTable::ValueMap Map;
+ std::string what = bind.getString(x_match);
+ if (what == all) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j == msg.getMap().end()) return false;
+ if (!match_values(*(i->second), *(j->second))) return false;
+ }
+ }
+ return true;
+ } else if (what == any) {
+ for (Map::const_iterator i = bind.getMap().begin();
+ i != bind.getMap().end();
+ ++i)
+ {
+ if (i->first != x_match)
+ {
+ Map::const_iterator j = msg.getMap().find(i->first);
+ if (j != msg.getMap().end()) {
+ if (match_values(*(i->second), *(j->second))) return true;
+ }
+ }
+ }
+ return false;
+ } else {
+ return false;
+ }
+}
+
+}}
+
Propchange: incubator/qpid/trunk/qpid/cpp/broker/src/HeadersExchange.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerFactoryImpl.cpp Wed Sep 27 12:50:23 2006
@@ -18,6 +18,7 @@
#include "SessionHandlerFactoryImpl.h"
#include "SessionHandlerImpl.h"
#include "FanOutExchange.h"
+#include "HeadersExchange.h"
using namespace qpid::broker;
using namespace qpid::io;
@@ -28,6 +29,7 @@
const std::string amq_direct("amq.direct");
const std::string amq_topic("amq.topic");
const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
}
SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(u_int32_t _timeout) : timeout(_timeout), cleaner(&queues, timeout/10){
@@ -35,6 +37,7 @@
exchanges.declare(new DirectExchange(amq_direct));
exchanges.declare(new TopicExchange(amq_topic));
exchanges.declare(new FanOutExchange(amq_fanout));
+ exchanges.declare(new HeadersExchange(amq_match));
cleaner.start();
}
Modified: incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/broker/src/SessionHandlerImpl.cpp Wed Sep 27 12:50:23 2006
@@ -18,6 +18,8 @@
#include <iostream>
#include "SessionHandlerImpl.h"
#include "FanOutExchange.h"
+#include "TopicExchange.h"
+#include "HeadersExchange.h"
#include "assert.h"
using namespace std::tr1;
@@ -223,7 +225,9 @@
if(!passive && (
type != TopicExchange::typeName &&
type != DirectExchange::typeName &&
- type != FanOutExchange::typeName)
+ type != FanOutExchange::typeName &&
+ type != HeadersExchange::typeName
+ )
)
{
throw ChannelException(540, "Exchange type not implemented: " + type);
@@ -237,6 +241,8 @@
parent->exchanges->declare(new DirectExchange(exchange));
}else if(type == FanOutExchange::typeName){
parent->exchanges->declare(new DirectExchange(exchange));
+ }else if (type == HeadersExchange::typeName) {
+ parent->exchanges->declare(new HeadersExchange(exchange));
}
}
parent->exchanges->getLock()->release();
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQBody.h?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQBody.h Wed Sep 27 12:50:23 2006
@@ -27,18 +27,20 @@
class AMQBody
{
- public:
+ public:
typedef std::tr1::shared_ptr<AMQBody> shared_ptr;
+ virtual ~AMQBody();
virtual u_int32_t size() const = 0;
virtual u_int8_t type() const = 0;
virtual void encode(Buffer& buffer) const = 0;
virtual void decode(Buffer& buffer, u_int32_t size) = 0;
- inline virtual ~AMQBody(){}
+ virtual void print(std::ostream& out) const;
};
- enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8};
+ std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
+ enum body_types {METHOD_BODY = 1, HEADER_BODY = 2, CONTENT_BODY = 3, HEARTBEAT_BODY = 8};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQContentBody.h Wed Sep 27 12:50:23 2006
@@ -40,6 +40,7 @@
u_int32_t size() const;
void encode(Buffer& buffer) const;
void decode(Buffer& buffer, u_int32_t size);
+ void print(std::ostream& out) const;
};
}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeaderBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeaderBody.h?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeaderBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeaderBody.h Wed Sep 27 12:50:23 2006
@@ -40,12 +40,14 @@
AMQHeaderBody();
inline u_int8_t type() const { return HEADER_BODY; }
HeaderProperties* getProperties(){ return properties; }
+ const HeaderProperties* getProperties() const { return properties; }
inline u_int64_t getContentSize() const { return contentSize; }
inline void setContentSize(u_int64_t size) { contentSize = size; }
virtual ~AMQHeaderBody();
virtual u_int32_t size() const;
virtual void encode(Buffer& buffer) const;
virtual void decode(Buffer& buffer, u_int32_t size);
+ virtual void print(std::ostream& out) const;
};
}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeartbeatBody.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeartbeatBody.h?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeartbeatBody.h (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/inc/AMQHeartbeatBody.h Wed Sep 27 12:50:23 2006
@@ -30,11 +30,12 @@
public:
typedef std::tr1::shared_ptr<AMQHeartbeatBody> shared_ptr;
- virtual ~AMQHeartbeatBody() {}
+ virtual ~AMQHeartbeatBody();
inline u_int32_t size() const { return 0; }
inline u_int8_t type() const { return HEARTBEAT_BODY; }
inline void encode(Buffer& buffer) const {}
inline void decode(Buffer& buffer, u_int32_t size) {}
+ virtual void print(std::ostream& out) const;
};
}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/inc/Buffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/inc/Buffer.h?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/inc/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/inc/Buffer.h Wed Sep 27 12:50:23 2006
@@ -16,13 +16,14 @@
*
*/
#include "amqp_types.h"
-#include "FieldTable.h"
#ifndef _Buffer_
#define _Buffer_
namespace qpid {
namespace framing {
+
+class FieldTable;
class Buffer
{
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/inc/FieldTable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/inc/FieldTable.h?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/inc/FieldTable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/inc/FieldTable.h Wed Sep 27 12:50:23 2006
@@ -17,6 +17,8 @@
*/
#include <iostream>
#include <vector>
+#include <tr1/memory>
+#include <tr1/unordered_map>
#include "amqp_types.h"
#ifndef _FieldTable_
@@ -25,42 +27,50 @@
namespace qpid {
namespace framing {
- class NamedValue;
- class Value;
- class Buffer;
-
- class FieldTable
- {
- std::vector<NamedValue*> values;
- NamedValue* find(const std::string& name) const;
-
- Value* getValue(const std::string& name) const;
- void setValue(const std::string& name, Value* value);
-
- public:
- ~FieldTable();
- u_int32_t size() const;
- int count() const;
- void setString(const std::string& name, const std::string& value);
- void setInt(const std::string& name, int value);
- void setTimestamp(const std::string& name, u_int64_t value);
- void setTable(const std::string& name, const FieldTable& value);
- //void setDecimal(string& name, xxx& value);
- std::string getString(const std::string& name);
- int getInt(const std::string& name);
- u_int64_t getTimestamp(const std::string& name);
- void getTable(const std::string& name, FieldTable& value);
- //void getDecimal(string& name, xxx& value);
-
- void encode(Buffer& buffer) const;
- void decode(Buffer& buffer);
-
- friend std::ostream& operator<<(std::ostream& out, const FieldTable& body);
- };
-
- class FieldNotFoundException{};
- class UnknownFieldName : public FieldNotFoundException{};
- class IncorrectFieldType : public FieldNotFoundException{};
+class Value;
+class Buffer;
+
+class FieldTable
+{
+ public:
+ typedef std::tr1::shared_ptr<Value> ValuePtr;
+ typedef std::tr1::unordered_map<std::string, ValuePtr> ValueMap;
+
+ ~FieldTable();
+ u_int32_t size() const;
+ int count() const;
+ void setString(const std::string& name, const std::string& value);
+ void setInt(const std::string& name, int value);
+ void setTimestamp(const std::string& name, u_int64_t value);
+ void setTable(const std::string& name, const FieldTable& value);
+ //void setDecimal(string& name, xxx& value);
+ std::string getString(const std::string& name) const;
+ int getInt(const std::string& name) const;
+ u_int64_t getTimestamp(const std::string& name) const;
+ void getTable(const std::string& name, FieldTable& value) const;
+ //void getDecimal(string& name, xxx& value);
+ void erase(const std::string& name);
+
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer);
+
+ bool operator==(const FieldTable& other) const;
+
+ // TODO aconway 2006-09-26: Yeuch! Rework FieldTable to have
+ // a map-like interface.
+ const ValueMap& getMap() const { return values; }
+ ValueMap& getMap() { return values; }
+
+
+ private:
+ friend std::ostream& operator<<(std::ostream& out, const FieldTable& body);
+ ValueMap values;
+ template<class T> T getValue(const std::string& name) const;
+};
+
+class FieldNotFoundException{};
+class UnknownFieldName : public FieldNotFoundException{};
+class IncorrectFieldType : public FieldNotFoundException{};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/inc/Value.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/inc/Value.h?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/inc/Value.h (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/inc/Value.h Wed Sep 27 12:50:23 2006
@@ -26,84 +26,135 @@
namespace qpid {
namespace framing {
- class Buffer;
+class Buffer;
- class Value{
- public:
- inline virtual ~Value(){}
- virtual u_int32_t size() const = 0;
- virtual char getType() const = 0;
- virtual void encode(Buffer& buffer) = 0;
- virtual void decode(Buffer& buffer) = 0;
- };
-
- class StringValue : public virtual Value{
- string value;
-
- public:
- inline StringValue(const string& v) : value(v){}
- inline StringValue(){}
- inline string getValue(){ return value; }
- ~StringValue(){}
- inline virtual u_int32_t size() const { return 4 + value.length(); }
- inline virtual char getType() const { return 'S'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-
- class IntegerValue : public virtual Value{
- int value;
- public:
- inline IntegerValue(int v) : value(v){}
- inline IntegerValue(){}
- inline int getValue(){ return value; }
- ~IntegerValue(){}
- inline virtual u_int32_t size() const { return 4; }
- inline virtual char getType() const { return 'I'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-
- class TimeValue : public virtual Value{
- u_int64_t value;
- public:
- inline TimeValue(int v) : value(v){}
- inline TimeValue(){}
- inline u_int64_t getValue(){ return value; }
- ~TimeValue(){}
- inline virtual u_int32_t size() const { return 8; }
- inline virtual char getType() const { return 'T'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-
- class DecimalValue : public virtual Value{
- u_int8_t decimals;
- u_int32_t value;
- public:
- inline DecimalValue(int v) : value(v){}
- inline DecimalValue(){}
- ~DecimalValue(){}
- inline virtual u_int32_t size() const { return 5; }
- inline virtual char getType() const { return 'D'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-
- class FieldTableValue : public virtual Value{
- FieldTable value;
- public:
- inline FieldTableValue(const FieldTable& v) : value(v){}
- inline FieldTableValue(){}
- inline FieldTable getValue(){ return value; }
- ~FieldTableValue(){}
- inline virtual u_int32_t size() const { return 4 + value.size(); }
- inline virtual char getType() const { return 'F'; }
- virtual void encode(Buffer& buffer);
- virtual void decode(Buffer& buffer);
- };
-}
-}
+/**
+ * Represents a decimal value.
+ * No arithmetic functionality for now, we only care about encoding/decoding.
+ */
+struct Decimal {
+ u_int32_t value;
+ u_int8_t decimals;
+
+ Decimal(u_int32_t value_=0, u_int8_t decimals_=0) : value(value_), decimals(decimals_) {}
+ bool operator==(const Decimal& d) const {
+ return decimals == d.decimals && value == d.value;
+ }
+ bool operator!=(const Decimal& d) const { return !(*this == d); }
+};
+
+std::ostream& operator<<(std::ostream& out, const Decimal& d);
+
+/**
+ * Polymorpic base class for values.
+ */
+class Value {
+ public:
+ virtual ~Value();
+ virtual u_int32_t size() const = 0;
+ virtual char getType() const = 0;
+ virtual void encode(Buffer& buffer) = 0;
+ virtual void decode(Buffer& buffer) = 0;
+ virtual bool operator==(const Value&) const = 0;
+ bool operator!=(const Value& v) const { return !(*this == v); }
+ virtual void print(std::ostream& out) const = 0;
+
+ /** Create a new value by decoding from the buffer */
+ static std::auto_ptr<Value> decode_value(Buffer& buffer);
+};
+
+std::ostream& operator<<(std::ostream& out, const Value& d);
+
+
+/**
+ * Template for common operations on Value sub-classes.
+ */
+template <class T>
+class ValueOps : public Value
+{
+ protected:
+ T value;
+ public:
+ ValueOps() {}
+ ValueOps(const T& v) : value(v) {}
+ const T& getValue() const { return value; }
+ T& getValue() { return value; }
+
+ virtual bool operator==(const Value& v) const {
+ const ValueOps<T>* vo = dynamic_cast<const ValueOps<T>*>(&v);
+ if (vo == 0) return false;
+ else return value == vo->value;
+ }
+
+ void print(std::ostream& out) const { out << value; }
+};
+
+
+class StringValue : public ValueOps<std::string> {
+ public:
+ StringValue(const std::string& v) : ValueOps<std::string>(v) {}
+ StringValue() {}
+ virtual u_int32_t size() const { return 4 + value.length(); }
+ virtual char getType() const { return 'S'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class IntegerValue : public ValueOps<int> {
+ public:
+ IntegerValue(int v) : ValueOps<int>(v) {}
+ IntegerValue(){}
+ virtual u_int32_t size() const { return 4; }
+ virtual char getType() const { return 'I'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class TimeValue : public ValueOps<u_int64_t> {
+ public:
+ TimeValue(u_int64_t v) : ValueOps<u_int64_t>(v){}
+ TimeValue(){}
+ virtual u_int32_t size() const { return 8; }
+ virtual char getType() const { return 'T'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class DecimalValue : public ValueOps<Decimal> {
+ public:
+ DecimalValue(const Decimal& d) : ValueOps<Decimal>(d) {}
+ DecimalValue(u_int32_t value_=0, u_int8_t decimals_=0) :
+ ValueOps<Decimal>(Decimal(value_, decimals_)){}
+ virtual u_int32_t size() const { return 5; }
+ virtual char getType() const { return 'D'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+
+class FieldTableValue : public ValueOps<FieldTable> {
+ public:
+ FieldTableValue(const FieldTable& v) : ValueOps<FieldTable>(v){}
+ FieldTableValue(){}
+ virtual u_int32_t size() const { return 4 + value.size(); }
+ virtual char getType() const { return 'F'; }
+ virtual void encode(Buffer& buffer);
+ virtual void decode(Buffer& buffer);
+};
+
+class EmptyValue : public Value {
+ public:
+ ~EmptyValue();
+ virtual u_int32_t size() const { return 0; }
+ virtual char getType() const { return 0; }
+ virtual void encode(Buffer& buffer) {}
+ virtual void decode(Buffer& buffer) {}
+ virtual bool operator==(const Value& v) const {
+ return dynamic_cast<const EmptyValue*>(&v);
+ }
+ virtual void print(std::ostream& out) const;
+};
+}} // qpid::framing
#endif
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQContentBody.cpp Wed Sep 27 12:50:23 2006
@@ -16,6 +16,7 @@
*
*/
#include "AMQContentBody.h"
+#include <iostream>
qpid::framing::AMQContentBody::AMQContentBody(){
}
@@ -33,3 +34,7 @@
buffer.getRawData(data, size);
}
+void qpid::framing::AMQContentBody::print(std::ostream& out) const
+{
+ out << "content (" << size() << " bytes)";
+}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQFrame.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQFrame.cpp?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQFrame.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQFrame.cpp Wed Sep 27 12:50:23 2006
@@ -1,3 +1,4 @@
+
/*
*
* Copyright (c) 2006 The Apache Software Foundation
@@ -126,21 +127,8 @@
std::ostream& qpid::framing::operator<<(std::ostream& out, const AMQFrame& t){
out << "Frame[channel=" << t.channel << "; ";
- if(t.body.get() == 0){
- out << "empty";
- }else if(t.body->type() == METHOD_BODY){
- (dynamic_cast<AMQMethodBody*>(t.body.get()))->print(out);
- }else if(t.body->type() == HEADER_BODY){
- out << "header, content_size=" <<
- (dynamic_cast<AMQHeaderBody*>(t.body.get()))->getContentSize()
- << " (" << t.body->size() << " bytes)";
- }else if(t.body->type() == CONTENT_BODY){
- out << "content (" << t.body->size() << " bytes)";
- }else if(t.body->type() == HEARTBEAT_BODY){
- out << "heartbeat";
- }else{
- out << "unknown type, " << t.body->type();
- }
+ if (t.body.get() == 0) out << "empty";
+ else out << *t.body;
out << "]";
return out;
}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQHeaderBody.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQHeaderBody.cpp?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQHeaderBody.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/src/AMQHeaderBody.cpp Wed Sep 27 12:50:23 2006
@@ -58,3 +58,16 @@
THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class");
}
}
+
+void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
+{
+ out << "header, content_size=" << getContentSize()
+ << " (" << size() << " bytes)" << ", headers=" ;
+ // TODO aconway 2006-09-26: Hack to see headers.
+ // Should write proper op << for BasicHeaderProperties.
+ //
+ const BasicHeaderProperties* props =
+ dynamic_cast<const BasicHeaderProperties*>(getProperties());
+ // TODO aconway 2006-09-26: Lose the static cast, fix BasicHeaderProperties
+ if (props) out << const_cast<BasicHeaderProperties*>(props)->getHeaders();
+}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/src/Buffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/src/Buffer.cpp?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/src/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/src/Buffer.cpp Wed Sep 27 12:50:23 2006
@@ -16,6 +16,7 @@
*
*/
#include "Buffer.h"
+#include "FieldTable.h"
qpid::framing::Buffer::Buffer(int _size) : size(_size), position(0), limit(_size){
data = new char[size];
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/src/FieldTable.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/src/FieldTable.cpp?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/src/FieldTable.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/src/FieldTable.cpp Wed Sep 27 12:50:23 2006
@@ -16,112 +16,133 @@
*
*/
#include "FieldTable.h"
-#include "NamedValue.h"
#include "QpidError.h"
#include "Buffer.h"
#include "Value.h"
+#include <assert.h>
-qpid::framing::FieldTable::~FieldTable(){
- int count(values.size());
- for(int i = 0; i < count; i++){
- delete values[i];
- }
-}
+namespace qpid {
+namespace framing {
+
+FieldTable::~FieldTable() {}
-u_int32_t qpid::framing::FieldTable::size() const {
+u_int32_t FieldTable::size() const {
u_int32_t size(4);
- int count(values.size());
- for(int i = 0; i < count; i++){
- size += values[i]->size();
+ for(ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
+ // 2 = shortstr_len_byyte + type_char_byte
+ size += 2 + (i->first).size() + (i->second)->size();
}
return size;
}
-int qpid::framing::FieldTable::count() const {
+int FieldTable::count() const {
return values.size();
}
-std::ostream& qpid::framing::operator<<(std::ostream& out, const FieldTable& t){
- out << "field_table{}";
- return out;
+namespace
+{
+std::ostream& operator<<(std::ostream& out, const FieldTable::ValueMap::value_type& i) {
+ return out << i.first << ":" << *i.second;
+}
}
-void qpid::framing::FieldTable::setString(const std::string& name, const std::string& value){
- setValue(name, new StringValue(value));
+std::ostream& operator<<(std::ostream& out, const FieldTable& t) {
+ out << "field_table{";
+ FieldTable::ValueMap::const_iterator i = t.getMap().begin();
+ if (i != t.getMap().end()) out << *i++;
+ while (i != t.getMap().end())
+ {
+ out << "," << *i++;
+ }
+ return out << "}";
}
-void qpid::framing::FieldTable::setInt(const std::string& name, int value){
- setValue(name, new IntegerValue(value));
+void FieldTable::setString(const std::string& name, const std::string& value){
+ values[name] = ValuePtr(new StringValue(value));
}
-void qpid::framing::FieldTable::setTimestamp(const std::string& name, u_int64_t value){
- setValue(name, new TimeValue(value));
+void FieldTable::setInt(const std::string& name, int value){
+ values[name] = ValuePtr(new IntegerValue(value));
}
-void qpid::framing::FieldTable::setTable(const std::string& name, const FieldTable& value){
- setValue(name, new FieldTableValue(value));
+void FieldTable::setTimestamp(const std::string& name, u_int64_t value){
+ values[name] = ValuePtr(new TimeValue(value));
}
-std::string qpid::framing::FieldTable::getString(const std::string& name){
- StringValue* val = dynamic_cast<StringValue*>(getValue(name));
- return (val == 0 ? "" : val->getValue());
+void FieldTable::setTable(const std::string& name, const FieldTable& value){
+ values[name] = ValuePtr(new FieldTableValue(value));
}
-int qpid::framing::FieldTable::getInt(const std::string& name){
- IntegerValue* val = dynamic_cast<IntegerValue*>(getValue(name));
- return (val == 0 ? 0 : val->getValue());
+namespace {
+// TODO aconway 2006-09-26: This is messy. Revisit the field table
+// and Value classes with a traits-based approach.
+//
+template <class T> T default_value() { return T(); }
+template <> int default_value<int>() { return 0; }
+template <> u_int64_t default_value<u_int64_t>() { return 0; }
}
-u_int64_t qpid::framing::FieldTable::getTimestamp(const std::string& name){
- TimeValue* val = dynamic_cast<TimeValue*>(getValue(name));
- return (val == 0 ? 0 : val->getValue());
+template <class T>
+T FieldTable::getValue(const std::string& name) const
+{
+ ValueMap::const_iterator i = values.find(name);
+ if (i == values.end()) return default_value<T>();
+ const ValueOps<T> *vt = dynamic_cast<const ValueOps<T>*>(i->second.get());
+ return vt->getValue();
}
-void qpid::framing::FieldTable::getTable(const std::string& name, FieldTable& value){
- FieldTableValue* val = dynamic_cast<FieldTableValue*>(getValue(name));
- if(val != 0) value = val->getValue();
+std::string FieldTable::getString(const std::string& name) const {
+ return getValue<std::string>(name);
}
-qpid::framing::NamedValue* qpid::framing::FieldTable::find(const std::string& name) const{
- int count(values.size());
- for(int i = 0; i < count; i++){
- if(values[i]->getName() == name) return values[i];
- }
- return 0;
+int FieldTable::getInt(const std::string& name) const {
+ return getValue<int>(name);
}
-qpid::framing::Value* qpid::framing::FieldTable::getValue(const std::string& name) const{
- NamedValue* val = find(name);
- return val == 0 ? 0 : val->getValue();
-}
-
-void qpid::framing::FieldTable::setValue(const std::string& name, Value* value){
- NamedValue* val = find(name);
- if(val == 0){
- val = new NamedValue(name, value);
- values.push_back(val);
- }else{
- Value* old = val->getValue();
- if(old != 0) delete old;
- val->setValue(value);
- }
+u_int64_t FieldTable::getTimestamp(const std::string& name) const {
+ return getValue<u_int64_t>(name);
+}
+
+void FieldTable::getTable(const std::string& name, FieldTable& value) const {
+ value = getValue<FieldTable>(name);
}
-void qpid::framing::FieldTable::encode(Buffer& buffer) const{
+void FieldTable::encode(Buffer& buffer) const{
buffer.putLong(size() - 4);
- int count(values.size());
- for(int i = 0; i < count; i++){
- values[i]->encode(buffer);
+ for (ValueMap::const_iterator i = values.begin(); i!=values.end(); ++i) {
+ buffer.putShortString(i->first);
+ buffer.putOctet(i->second->getType());
+ i->second->encode(buffer);
}
}
-void qpid::framing::FieldTable::decode(Buffer& buffer){
+void FieldTable::decode(Buffer& buffer){
u_int32_t size = buffer.getLong();
int leftover = buffer.available() - size;
while(buffer.available() > leftover){
- NamedValue* value = new NamedValue();
- value->decode(buffer);
- values.push_back(value);
+ std::string name;
+ buffer.getShortString(name);
+ std::auto_ptr<Value> value(Value::decode_value(buffer));
+ values[name] = ValuePtr(value.release());
}
+}
+
+
+bool FieldTable::operator==(const FieldTable& x) const {
+ if (values.size() != x.values.size()) return false;
+ for (ValueMap::const_iterator i = values.begin(); i != values.end(); ++i) {
+ ValueMap::const_iterator j = x.values.find(i->first);
+ if (j == x.values.end()) return false;
+ if (*(i->second) != *(j->second)) return false;
+ }
+ return true;
+}
+
+void FieldTable::erase(const std::string& name)
+{
+ values.erase(values.find(name));
+}
+
+}
}
Modified: incubator/qpid/trunk/qpid/cpp/common/framing/src/Value.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/common/framing/src/Value.cpp?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/common/framing/src/Value.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/common/framing/src/Value.cpp Wed Sep 27 12:50:23 2006
@@ -18,40 +18,94 @@
#include "Value.h"
#include "Buffer.h"
#include "FieldTable.h"
+#include "QpidError.h"
-void qpid::framing::StringValue::encode(Buffer& buffer){
+namespace qpid {
+namespace framing {
+
+Value::~Value() {}
+
+void StringValue::encode(Buffer& buffer){
buffer.putLongString(value);
}
-void qpid::framing::StringValue::decode(Buffer& buffer){
+void StringValue::decode(Buffer& buffer){
buffer.getLongString(value);
}
-void qpid::framing::IntegerValue::encode(Buffer& buffer){
+void IntegerValue::encode(Buffer& buffer){
buffer.putLong((u_int32_t) value);
}
-void qpid::framing::IntegerValue::decode(Buffer& buffer){
+void IntegerValue::decode(Buffer& buffer){
value = buffer.getLong();
}
-void qpid::framing::TimeValue::encode(Buffer& buffer){
+void TimeValue::encode(Buffer& buffer){
buffer.putLongLong(value);
}
-void qpid::framing::TimeValue::decode(Buffer& buffer){
+void TimeValue::decode(Buffer& buffer){
value = buffer.getLongLong();
}
-void qpid::framing::DecimalValue::encode(Buffer& buffer){
- buffer.putOctet(decimals);
- buffer.putLong(value);
+void DecimalValue::encode(Buffer& buffer){
+ buffer.putOctet(value.decimals);
+ buffer.putLong(value.value);
}
-void qpid::framing::DecimalValue::decode(Buffer& buffer){
- decimals = buffer.getOctet();
- value = buffer.getLong();
+void DecimalValue::decode(Buffer& buffer){
+ value = Decimal(buffer.getLong(), buffer.getOctet());
}
-void qpid::framing::FieldTableValue::encode(Buffer& buffer){
+void FieldTableValue::encode(Buffer& buffer){
buffer.putFieldTable(value);
}
-void qpid::framing::FieldTableValue::decode(Buffer& buffer){
+void FieldTableValue::decode(Buffer& buffer){
buffer.getFieldTable(value);
}
+
+std::auto_ptr<Value> Value::decode_value(Buffer& buffer)
+{
+ std::auto_ptr<Value> value;
+ u_int8_t type = buffer.getOctet();
+ switch(type){
+ case 'S':
+ value.reset(new StringValue());
+ break;
+ case 'I':
+ value.reset(new IntegerValue());
+ break;
+ case 'D':
+ value.reset(new DecimalValue());
+ break;
+ case 'T':
+ value.reset(new TimeValue());
+ break;
+ case 'F':
+ value.reset(new FieldTableValue());
+ break;
+ default:
+ THROW_QPID_ERROR(FRAMING_ERROR, "Unknown field table value type");
+ }
+ value->decode(buffer);
+ return value;
+}
+
+EmptyValue::~EmptyValue() {}
+
+void EmptyValue::print(std::ostream& out) const
+{
+ out << "<empty field value>";
+}
+
+std::ostream& operator<<(std::ostream& out, const Value& v) {
+ v.print(out);
+ return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const Decimal& d)
+{
+ return out << "Decimal(" << d.value << "," << d.decimals << ")";
+}
+
+}}
+
+
+
Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Wed Sep 27 12:50:23 2006
@@ -20,7 +20,7 @@
server, or even a proxy implementation.
"""
-import socket, codec
+import socket, codec,logging
from cStringIO import StringIO
from spec import load, pythonize
from codec import EOF
@@ -240,8 +240,10 @@
properties = {}
for b, f in zip(bits, klass.fields):
if b:
- properties[f.name] = c.decode(f.type)
-
+ # Note: decode returns a unicode u'' string but only
+ # plain '' strings can be used as keywords so we need to
+ # stringify the names.
+ properties[str(f.name)] = c.decode(f.type)
return Header(klass, weight, size, **properties)
def __str__(self):
Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Wed Sep 27 12:50:23 2006
@@ -146,7 +146,6 @@
def invoke(self, method, args, content = None):
if self.closed:
raise Closed(self.reason)
-
frame = Frame(self.id, Method(method, *args))
self.outgoing.put(frame)
@@ -181,7 +180,7 @@
def write_content(self, klass, content, queue):
size = content.size()
- header = Frame(self.id, Header(klass, content.weight(), size))
+ header = Frame(self.id, Header(klass, content.weight(), size, **content.properties))
queue.put(header)
for child in content.children:
self.write_content(klass, child, queue)
Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Wed Sep 27 12:50:23 2006
@@ -22,7 +22,7 @@
import qpid.client, qpid.spec
import Queue
from getopt import getopt, GetoptError
-
+from qpid.content import Content
def findmodules(root):
"""Find potential python modules under directory root"""
@@ -161,10 +161,6 @@
self.channel.channel_open()
def tearDown(self):
- # TODO aconway 2006-09-05: Wrong behaviour here, we should
- # close all open channels (checking for exceptions on the
- # channesl) then open a channel to clean up qs and exs,
- # finally close that channel.
for ch, q in self.queues:
ch.queue_delete(queue=q)
for ch, ex in self.exchanges:
@@ -186,13 +182,11 @@
arguments={}):
channel = channel or self.channel
reply = channel.exchange_declare(ticket, exchange, type, passive, durable, auto_delete, internal, nowait, arguments)
- # TODO aconway 2006-09-14: Don't add exchange on failure.
self.exchanges.append((channel,exchange))
return reply
def uniqueString(self):
"""Generate a unique string, unique for this TestBase instance"""
- # TODO aconway 2006-09-20: Not thread safe.
if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
return "Test Message " + str(self.uniqueCounter)
@@ -208,22 +202,24 @@
self.fail("Queue is not empty.")
except Queue.Empty: None # Ignore
- def assertPublishGet(self, queue, exchange="", routing_key=""):
+ def assertPublishGet(self, queue, exchange="", routing_key="", properties=None):
"""
Publish to exchange and assert queue.get() returns the same message.
"""
body = self.uniqueString()
self.channel.basic_publish(exchange=exchange,
- content=qpid.content.Content(body),
+ content=Content(body, properties=properties),
routing_key=routing_key)
- self.assertEqual(body, queue.get(timeout=2).content.body)
+ msg = queue.get(timeout=1)
+ self.assertEqual(body, msg.content.body)
+ if (properties): self.assertEqual(properties, msg.content.properties)
- def assertPublishConsume(self, queue="", exchange="", routing_key=""):
+ def assertPublishConsume(self, queue="", exchange="", routing_key="", properties=None):
"""
Publish a message and consume it, assert it comes back intact.
Return the Queue object used to consume.
"""
- self.assertPublishGet(self.consume(queue), exchange, routing_key)
+ self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
def assertChannelException(self, expectedCode, message):
self.assertEqual(message.method.klass.name, "channel")
Modified: incubator/qpid/trunk/qpid/python/tests/exchange.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/exchange.py?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/exchange.py Wed Sep 27 12:50:23 2006
@@ -20,22 +20,11 @@
Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
"""
-import logging, Queue
+import Queue, logging
from qpid.testlib import TestBase
from qpid.content import Content
-# TODO aconway 2006-09-01: Investigate and add tests as appropriate.
-# Observered on C++:
-#
-# No exception raised for basic_consume on non-existent queue name.
-# No exception for basic_publish with bad routing key.
-# No exception for binding to non-existent exchange?
-# queue_bind hangs with invalid exchange name
-#
-# Do server exceptions get propagated properly?
-# Do Java exceptions propagate with any data (or just Closed())
-
class StandardExchangeVerifier:
"""Verifies standard exchange behavior.
@@ -67,7 +56,6 @@
self.assertPublishGet(q, ex, "a.b.x")
self.assertPublishGet(q, ex, "a.x.b.x")
self.assertPublishGet(q, ex, "a.x.x.b.x")
-
# Shouldn't match
self.channel.basic_publish(exchange=ex, routing_key="a.b")
self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")
@@ -75,6 +63,16 @@
self.channel.basic_publish(exchange=ex, routing_key="a.b")
self.assert_(q.empty())
+ def verifyHeadersExchange(self, ex):
+ """Verify that ex is a headers exchange"""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+ q = self.consume("q")
+ headers = {"name":"fred", "age":3}
+ self.assertPublishGet(q, exchange=ex, properties={'headers':headers})
+ self.channel.basic_publish(exchange=ex) # No headers, won't deliver
+ self.assertEmpty(q);
+
class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
"""
@@ -97,6 +95,11 @@
"""Declare and test a topic exchange"""
self.exchange_declare(0, exchange="t", type="topic")
self.verifyTopicExchange("t")
+
+ def testHeaders(self):
+ """Declare and test a headers exchange"""
+ self.exchange_declare(0, exchange="h", type="headers")
+ self.verifyHeadersExchange("h")
class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
@@ -106,7 +109,7 @@
exchange instance is amq. followed by the exchange type name.
Client creates a temporary queue and attempts to bind to each required
- exchange instance (amq.fanout, amq.direct, and amq.topic, amq.headers if
+ exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
those types are defined).
"""
def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
@@ -115,9 +118,7 @@
def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
- def testAmqHeaders(self):
- self.exchange_declare(0, exchange="amq.headers", passive="true")
- # TODO aconway 2006-09-14: verify headers behavior
+ def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
"""
@@ -137,13 +138,14 @@
self.verifyDirectExchange("")
+# TODO aconway 2006-09-27: Fill in empty tests:
+
class DefaultAccessRuleTests(TestBase):
"""
The server MUST NOT allow clients to access the default exchange except
by specifying an empty exchange name in the Queue.Bind and content Publish
methods.
"""
- # TODO aconway 2006-09-18: fill this in.
class ExtensionsRuleTests(TestBase):
"""
@@ -251,4 +253,42 @@
The client MUST NOT attempt to delete an exchange that does not exist.
"""
+
+class HeadersExchangeTests(TestBase):
+ """
+ Tests for headers exchange functionality.
+ """
+ def setUp(self):
+ TestBase.setUp(self)
+ self.queue_declare(queue="q")
+ self.q = self.consume("q")
+
+ def myAssertPublishGet(self, headers):
+ self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers})
+
+ def myBasicPublish(self, headers):
+ self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers}))
+
+ def testMatchAll(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+
+ # None of these should match
+ self.myBasicPublish({})
+ self.myBasicPublish({"name":"barney"})
+ self.myBasicPublish({"name":10})
+ self.myBasicPublish({"name":"fred", "age":2})
+ self.assertEmpty(self.q)
+
+ def testMatchAny(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred"})
+ self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+ self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+ # Wont match
+ self.myBasicPublish({})
+ self.myBasicPublish({"irrelevant":0})
+ self.assertEmpty(self.q)
Modified: incubator/qpid/trunk/qpid/python/tests/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/testlib.py?view=diff&rev=450556&r1=450555&r2=450556
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/testlib.py Wed Sep 27 12:50:23 2006
@@ -52,3 +52,12 @@
self.fail("assertEmpty did not assert on non-empty queue")
except AssertionError: None # Ignore
+ def testMessageProperties(self):
+ """Verify properties are passed with message"""
+ props={"headers":{"x":1, "y":2}}
+ self.queue_declare(queue="q")
+ q = self.consume("q")
+ self.assertPublishGet(q, routing_key="q", properties=props)
+
+
+