You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/05/06 19:52:04 UTC

svn commit: r653854 - in /incubator/qpid/trunk/qpid/cpp: ./ examples/ examples/examples/xml-exchange/ src/ src/qpid/broker/ src/qpid/client/ src/qpid/framing/ src/tests/

Author: aconway
Date: Tue May  6 10:52:03 2008
New Revision: 653854

URL: http://svn.apache.org/viewvc?rev=653854&view=rev
Log:
>From https://issues.apache.org/jira/browse/QPID-879 contributed by Jonathan Robie.
XML exchange allowing messages to be routed base on XQuery expressions.

Added:
    incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/Makefile   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/README   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/declare_queues.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/listener.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/xml_producer.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/examples/Makefile.am
    incubator/qpid/trunk/qpid/cpp/qpidc.spec.in
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h
    incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Tue May  6 10:52:03 2008
@@ -139,6 +139,7 @@
 # Check for optional CPG requirement.
 LDFLAGS="$LDFLAGS -L/usr/lib/openais -L/usr/lib64/openais"
 
+
 AC_ARG_WITH([cpg],  
   [AS_HELP_STRING([--with-cpg], [Build with CPG support])],
   [case "${withval}" in
@@ -163,7 +164,6 @@
   CPPFLAGS+=" -DCPG"
 fi
 
-
 # Setup --with-sasl/--without-sasl as arguments to configure
 AC_ARG_WITH([sasl],
   [AS_HELP_STRING([--with-sasl], [Build with SASL authentication support])],
@@ -194,6 +194,38 @@
                 [The SASL app name for the qpid Broker])
       AC_DEFINE([HAVE_SASL], [1], [Enable if libsasl is present])])])
 
+
+# Setup --with-xml/--without-xml as arguments to configure
+use_xml=yes
+want_xml=check
+AC_ARG_WITH([xml],
+  [AS_HELP_STRING([--with-xml], [Build with XML Exchange])],
+  [want_xml=$withval])
+
+case $want_xml in
+  yes|no|check) ;;
+  *) AC_MSG_ERROR([Bad value for --with-xml: $withval]) ;;
+esac
+
+test $want_xml = no && use_xml=no
+
+# If the user doesn't say not to use XML, see if it's available.
+if test $use_xml != no; then
+  # Then see if XQilla is available
+  AC_CHECK_LIB([xerces-c], [_init], , [use_xml=no])
+  AC_CHECK_HEADER([xqilla/xqilla-simple.hpp], , [use_xml=no])
+  AC_CHECK_LIB([xqilla], [canonicalCombiningClassTable], , [use_xml=no])
+  
+  # If XQilla is not available, yet specifically requested, die.
+  test $use_xml:$want_xml = no:yes &&
+    AC_MSG_ERROR([XML Exchange requested, but XQilla or Xerces-C not available])
+
+  # Else XQilla is available - use it to build
+  AC_DEFINE([HAVE_XML], [1], [Compile-in XML Exchange support.])
+fi
+
+AM_CONDITIONAL([HAVE_XML], [test $use_xml = yes])
+
 # Setup --with-rdma/--without-rdma as arguments to configure
 AC_ARG_WITH([rdma],
   [AS_HELP_STRING([--with-rdma], [Build with support for Remote DMA protocols])],

Modified: incubator/qpid/trunk/qpid/cpp/examples/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/Makefile.am?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/examples/Makefile.am Tue May  6 10:52:03 2008
@@ -15,7 +15,11 @@
 	examples/direct/Makefile				\
 	examples/direct/direct_producer.cpp			\
 	examples/direct/listener.cpp				\
-	examples/direct/declare_queues.cpp
+	examples/direct/declare_queues.cpp			\
+	examples/xml-exchange/Makefile				\
+	examples/xml-exchange/declare_queues.cpp		\
+	examples/xml-exchange/xml_producer.cpp			\
+	examples/xml-exchange/listener.cpp
 
 VERIFY_FILES= verify verify_all				\
 	examples/request-response/verify		\
@@ -60,9 +64,6 @@
 	test -d examples || cp -R $(srcdir)/examples .
 	cd examples && $(MAKE) CXX="$(CXX)" CXXFLAGS="$(CXXFLAGS) -I$(abs_top_srcdir)/src -I$(abs_top_srcdir)/src/gen -I$(abs_top_builddir)/src -I$(abs_top_builddir)/src/gen -L$(abs_top_builddir)/src/.libs -Wl,-rpath,$(abs_top_builddir)/src/.libs" all
 
-# FIXME aconway 2008-03-25: Re-enable when python client has been fixed
-# to find .spec via PYTHONPATH.
-# 
 # Verify the examples in the buid tree.
 check-local: all-local verify
 	$(srcdir)/verify_all $(abs_top_srcdir)/..

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue May  6 10:52:03 2008
@@ -0,0 +1,3 @@
+declare_queues
+listener
+xml_producer

Added: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/Makefile
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/Makefile?rev=653854&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/Makefile (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/Makefile Tue May  6 10:52:03 2008
@@ -0,0 +1,7 @@
+CXX=g++
+CXXFLAGS=
+LDFLAGS=-lqpidclient
+PROGRAMS=declare_queues xml_producer listener
+all: $(PROGRAMS)
+clean:
+	rm -f $(PROGRAMS) 

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/Makefile
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/README
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/README?rev=653854&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/README (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/README Tue May  6 10:52:03 2008
@@ -0,0 +1,48 @@
+This example shows how to program a simple application
+using the XML Exchange. 
+
+To run the example, execute the programs in the
+following order:
+
+1 ./declare_queues
+2 ./listener
+3 ./message_producer (in a separate window)
+
+The XML Exchange must be explicitly declared. Bindings
+are established using queries in XQuery. These queries
+can reference message content, message application 
+properties (which are declared as external variables
+in the XQuery), or both. 
+
+Once this is done, message producers publish to the
+exchange using the exchange name and a routing key,
+just as for other exchange types. Message consumers
+read from the queues to which messages are routed.
+If a message does not have XML content, or is
+missing message application properties needed by
+the query, the query is not routed.
+
+Queries can use message application headers to
+provide functionality similar to JMS selectors.
+If a query does not use the content of a message,
+the message content is not parsed, and need not
+be XML.
+
+The XQuery processor, XQilla,  does path-based 
+document projection, so once the portion of
+a document needed to evaluate a query has
+been read, it stops parsing the document.
+Suppose a long document has a header section.
+You can indicate in the query that only
+one header section needs to be queried,
+and there is no need to parse the entire
+document to see if there are further header
+sections, using a path like this:
+
+./message/header[1]/date
+
+If you used a path like this, all children
+of the message element would be read to
+see if there are further headers:
+
+./message/header/date

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/README
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/README
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/declare_queues.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/declare_queues.cpp?rev=653854&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/declare_queues.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/declare_queues.cpp Tue May  6 10:52:03 2008
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using std::string;
+
+
+int main(int argc, char** argv) {
+    const char* host = argc>1 ? argv[1] : "127.0.0.1";
+    int port = argc>2 ? atoi(argv[2]) : 5672;
+    Connection connection;
+    Message msg;
+    try {
+      connection.open(host, port);
+      Session session =  connection.newSession(ASYNC);
+
+
+  //--------- Main body of program --------------------------------------------
+
+      // Set up queues, bind them with queries. Note that the XML exchange
+      // is not in the AMQP specification, so it is called "xml", not "amq.xml".
+      // Note that the XML exchange is not predeclared in Qpid, it must
+      // be declared by the application.
+
+      session.queueDeclare(arg::queue="message_queue");
+      session.exchangeDeclare(arg::exchange="xml", arg::type="xml");
+
+      // Application message properties are mapped to external variables
+      // in the XQuery. An XML Exchange can query message properties much
+      // like JMS, query the XML content of the message, or both.
+
+      FieldTable binding;
+      binding.setString("xquery", "declare variable $control external;"
+				  "./message/id mod 2 = 1 or $control = 'end'");
+      session.exchangeBind(arg::exchange="xml", arg::queue="message_queue", arg::bindingKey="query_name", arg::arguments=binding); 
+
+  //-----------------------------------------------------------------------------
+
+      connection.close();
+      return 0;
+    } catch(const std::exception& error) {
+        std::cout << error.what() << std::endl;
+    }
+    return 1;
+   
+}
+
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/declare_queues.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/declare_queues.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/listener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/listener.cpp?rev=653854&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/listener.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/listener.cpp Tue May  6 10:52:03 2008
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/**
+ *  listener.cpp: This program reads messages fro a queue on
+ *  the broker using a message listener.
+ */
+
+#include <qpid/client/Dispatcher.h>
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+#include <qpid/client/SubscriptionManager.h>
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+      
+class Listener : public MessageListener{
+  private:
+    SubscriptionManager& subscriptions;
+  public:
+    Listener(SubscriptionManager& subscriptions);
+    virtual void received(Message& message);
+};
+
+Listener::Listener(SubscriptionManager& subs) : subscriptions(subs)
+{}
+
+void Listener::received(Message& message) {
+  std::cout << "Message: " << message.getData() << std::endl;
+  if (message.getHeaders().getString("control") == "end") {
+      std::cout << "Shutting down listener for " << message.getDestination()
+                << std::endl;
+      subscriptions.cancel(message.getDestination());
+  }
+}
+
+int main(int argc, char** argv) {
+    const char* host = argc>1 ? argv[1] : "127.0.0.1";
+    int port = argc>2 ? atoi(argv[2]) : 5672;
+    Connection connection;
+    Message msg;
+    try {
+      connection.open(host, port);
+      Session session =  connection.newSession(ASYNC);
+
+  //--------- Main body of program --------------------------------------------
+
+      SubscriptionManager subscriptions(session);
+      // Create a listener and subscribe it to the queue named "message_queue"
+      Listener listener(subscriptions);
+      subscriptions.subscribe(listener, "message_queue");
+      // Deliver messages until the subscription is cancelled
+      // by Listener::received()
+      subscriptions.run();
+
+  //---------------------------------------------------------------------------
+
+      connection.close();
+      return 0;
+    } catch(const std::exception& error) {
+        std::cout << error.what() << std::endl;
+    }
+    return 1;   
+}
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/listener.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/listener.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/xml_producer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/xml_producer.cpp?rev=653854&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/xml_producer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/xml_producer.cpp Tue May  6 10:52:03 2008
@@ -0,0 +1,88 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <qpid/client/Connection.h>
+#include <qpid/client/Session.h>
+#include <qpid/client/Message.h>
+
+
+#include <unistd.h>
+#include <cstdlib>
+#include <iostream>
+
+#include <sstream>
+
+using namespace qpid::client;
+using namespace qpid::framing;
+
+using std::stringstream;
+using std::string;
+
+int main(int argc, char** argv) {
+    const char* host = argc>1 ? argv[1] : "127.0.0.1";
+    int port = argc>2 ? atoi(argv[2]) : 5672;
+    Connection connection;
+    Message message;
+    try {
+        connection.open(host, port);
+        Session session =  connection.newSession(ASYNC);
+
+  //--------- Main body of program --------------------------------------------
+
+	// Publish some XML messages. Use the control property to
+	// indicate when we are finished.
+	//
+	// In the XML exchange, the routing key and the name of
+	// the query match.
+
+	message.getDeliveryProperties().setRoutingKey("query_name"); 
+	message.getHeaders().setString("control","continue");
+
+	// Now send some messages ...
+
+	for (int i=0; i<10; i++) {
+	  stringstream message_data;
+	  message_data << "<message><id>" << i << "</id></message>";
+
+	  std::cout << "Message data: " << message_data.str() << std::endl;
+
+	  message.setData(message_data.str());
+          session.messageTransfer(arg::content=message,  arg::destination="xml");
+	}
+	
+	// And send a final message to indicate termination.
+
+	message.getHeaders().setString("control","end");
+	message.setData("<end>That's all, folks!</end>");
+        session.messageTransfer(arg::content=message,  arg::destination="xml"); 
+
+  //-----------------------------------------------------------------------------
+
+        connection.close();
+        return 0;
+    } catch(const std::exception& error) {
+        std::cout << error.what() << std::endl;
+    }
+    return 1;
+}
+
+

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/xml_producer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/examples/examples/xml-exchange/xml_producer.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/qpidc.spec.in
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/qpidc.spec.in?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/qpidc.spec.in (original)
+++ incubator/qpid/trunk/qpid/cpp/qpidc.spec.in Tue May  6 10:52:03 2008
@@ -1,3 +1,4 @@
+
 #
 # Spec file for Qpid C++ packages: qpidc qpidc-devel, qpidd, qpidd-devel
 # svn revision: $Rev$
@@ -54,6 +55,8 @@
 Group: System Environment/Daemons
 Requires: %name = %version-%release
 Requires: openais
+Requires: xqilla
+Requires: xerces-c
 
 %description -n %{qpidd}
 A message broker daemon that receives stores and routes messages using

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue May  6 10:52:03 2008
@@ -46,7 +46,7 @@
 
 ## Compiler flags
 
-AM_CXXFLAGS = $(WARNING_CFLAGS)
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(CFLAGS)
 AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG)
 INCLUDES = -Igen -I$(srcdir)/gen
 
@@ -221,8 +221,12 @@
   qpid/ISList.h \
   qpid/pointer_to_other.h
 
-libqpidbroker_la_LIBADD = libqpidcommon.la \
-						  -luuid
+libqpidbroker_la_LIBADD = libqpidcommon.la -luuid
+if HAVE_XML
+libqpidbroker_la_LIBADD += -lxerces-c -lxqilla
+endif
+
+
 libqpidbroker_la_SOURCES = \
   $(mgen_broker_cpp) \
   qpid/amqp_0_10/Connection.h \
@@ -288,8 +292,13 @@
   qpid/management/ManagementObject.cpp \
   qpid/sys/TCPIOPlugin.cpp
 
-libqpidclient_la_LIBADD = libqpidcommon.la \
-						  -luuid
+if HAVE_XML
+libqpidbroker_la_SOURCES +=  qpid/broker/XmlExchange.cpp
+endif
+
+
+libqpidclient_la_LIBADD = libqpidcommon.la  -luuid
+
 libqpidclient_la_SOURCES =			\
   $(rgen_client_srcs)				\
   qpid/client/Bounds.cpp		\
@@ -528,6 +537,11 @@
   qpid/sys/Time.h \
   qpid/sys/TimeoutHandler.h
 
+if HAVE_XML
+nobase_include_HEADERS += qpid/broker/XmlExchange.h
+endif
+
+
 # Force build of qpidd during dist phase so help2man will work.
 dist-hook: $(BUILT_SOURCES)
 	$(MAKE) qpidd

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Deliverable.h Tue May  6 10:52:03 2008
@@ -22,6 +22,7 @@
 #define _Deliverable_
 
 #include "Queue.h"
+#include "Message.h"
 
 namespace qpid {
     namespace broker {
@@ -29,6 +30,9 @@
         public:
             bool delivered;
             Deliverable() : delivered(false) {}
+
+	    virtual Message& getMessage() = 0;
+	    
             virtual void deliverTo(Queue::shared_ptr& queue) = 0;
             virtual uint64_t contentSize() { return 0; }
             virtual ~Deliverable(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Tue May  6 10:52:03 2008
@@ -18,11 +18,16 @@
  * under the License.
  *
  */
+
+#include "config.h"
 #include "ExchangeRegistry.h"
 #include "DirectExchange.h"
 #include "FanOutExchange.h"
 #include "HeadersExchange.h"
 #include "TopicExchange.h"
+#ifdef HAVE_XML
+#include "XmlExchange.h"
+#endif
 #include "qpid/management/ManagementExchange.h"
 #include "qpid/framing/reply_exceptions.h"
 
@@ -55,7 +60,13 @@
             exchange = Exchange::shared_ptr(new HeadersExchange(name, durable, args, parent));
         }else if (type == ManagementExchange::typeName) {
             exchange = Exchange::shared_ptr(new ManagementExchange(name, durable, args, parent));
-        }else{
+        }
+#ifdef HAVE_XML
+	else if (type == XmlExchange::typeName) {
+            exchange = Exchange::shared_ptr(new XmlExchange(name, durable, args, parent));
+        }
+#endif
+	else{
             throw UnknownExchangeTypeException();    
         }
         exchanges[name] = exchange;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Tue May  6 10:52:03 2008
@@ -69,6 +69,8 @@
             virtual bool prepare(TransactionContext* ctxt) throw();
             virtual void commit() throw();
             virtual void rollback() throw();
+
+	    virtual Message& getMessage() { return *msg; };
             
             virtual void deliverTo(Queue::shared_ptr& queue);
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp?rev=653854&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp Tue May  6 10:52:03 2008
@@ -0,0 +1,249 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "config.h"
+#include "XmlExchange.h"
+
+#include "DeliverableMessage.h"
+
+#include "qpid/log/Statement.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/reply_exceptions.h"
+
+#include <xercesc/framework/MemBufInputSource.hpp>
+
+#include <xqilla/context/ItemFactory.hpp>
+#include <xqilla/xqilla-simple.hpp>
+
+#include <iostream>
+#include <sstream>
+
+using namespace qpid::framing;
+using namespace qpid::sys;
+using qpid::management::Manageable;
+
+namespace qpid {
+namespace broker {
+
+XmlExchange::XmlExchange(const string& _name, Manageable* _parent) : Exchange(_name, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
+
+XmlExchange::XmlExchange(const std::string& _name, bool _durable,
+                         const FieldTable& _args, Manageable* _parent) :
+    Exchange(_name, _durable, _args, _parent)
+{
+    if (mgmtExchange.get() != 0)
+        mgmtExchange->set_type (typeName);
+}
+
+/*
+ *  Use the name of the query as the binding key.
+ *
+ *  The first time a given name is used in a binding, the query body
+ *  must be provided.After that, no query body should be present.
+ *  
+ *  To modify an installed query, the user must first unbind the
+ *  existing query, then replace it by binding again with the same
+ *  name.
+ *
+ */
+
+      // #### TODO: The Binding should take the query text
+      // #### only. Consider encapsulating the entire block, including
+      // #### the if condition.
+      
+
+bool XmlExchange::bind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* bindingArguments)
+{
+    RWlock::ScopedWlock l(lock);
+    XmlBinding::vector& bindings(bindingsMap[routingKey]);
+    XmlBinding::vector::iterator i;
+
+    string queryText = bindingArguments->getString("xquery");
+
+    for (i = bindings.begin(); i != bindings.end(); i++)
+        if ((*i)->queue == queue)
+            break;
+
+    if (i == bindings.end()) {
+
+        try {
+            Query query(xqilla.parse(X(queryText.c_str())));
+            XmlBinding::shared_ptr binding(new XmlBinding (routingKey, queue, this, query));
+            XmlBinding::vector bindings(1, binding);
+            bindingsMap[routingKey] = bindings; 
+        }
+        catch (XQException& e) {
+            throw InternalErrorException(QPID_MSG("Could not parse xquery:"+ queryText));
+        }
+
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->inc_bindings ();
+        }
+        return true;
+    } else{
+        return false;
+    }
+}
+
+bool XmlExchange::unbind(Queue::shared_ptr queue, const string& routingKey, const FieldTable* /*args*/)
+{
+    RWlock::ScopedWlock l(lock);
+    XmlBinding::vector& bindings(bindingsMap[routingKey]);
+    XmlBinding::vector::iterator i;
+
+    for (i = bindings.begin(); i != bindings.end(); i++)
+        if ((*i)->queue == queue)
+            break;
+
+    if (i < bindings.end()) {
+        bindings.erase(i);
+        if (bindings.empty()) {
+            bindingsMap.erase(routingKey);
+        }
+        if (mgmtExchange.get() != 0) {
+            mgmtExchange->dec_bindings ();
+        }
+        return true;
+    } else {
+        return false;
+    }
+}
+
+bool XmlExchange::matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args) 
+{
+  // ### TODO: Need istream for frameset
+  // Hack alert - the following code does not work for really large messages
+
+  string msgContent;
+  msg.getMessage().getFrames().getContent(msgContent);
+
+  boost::scoped_ptr<DynamicContext> context(query->createDynamicContext());
+
+  try {
+    XERCES_CPP_NAMESPACE::MemBufInputSource xml((XMLByte*)msgContent.c_str(), msgContent.length(), "input" );
+    Sequence seq(context->parseDocument(xml));
+
+    FieldTable::ValueMap::const_iterator v = args->begin();
+    for(; v != args->end(); ++v) {
+      // ### TODO: Do types properly
+      if (v->second->convertsTo<std::string>()) {
+        QPID_LOG(trace, "XmlExchange, external variable: " << v->first << " = " << v->second->getData().getString().c_str());
+        Item::Ptr value = context->getItemFactory()->createString(X(v->second->getData().getString().c_str()), context.get());
+        context->setExternalVariable(X(v->first.c_str()), value);
+      }
+    }
+
+    if(!seq.isEmpty() && seq.first()->isNode()) {
+      context->setContextItem(seq.first());
+      context->setContextPosition(1);
+      context->setContextSize(1);
+    }
+    Result result = query->execute(context.get());
+    return result->getEffectiveBooleanValue(context.get(), 0);
+  }
+  catch (XQException& e) {
+    QPID_LOG(warning, "Could not parse XML content (or message headers):" << msgContent);
+  }
+  return 0;
+}
+
+void XmlExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* args)
+{
+    RWlock::ScopedRlock l(lock);
+    XmlBinding::vector& bindings(bindingsMap[routingKey]);
+    XmlBinding::vector::iterator i;
+    int count(0);
+
+    for (i = bindings.begin(); i != bindings.end(); i++, count++) {
+
+      if (matches((*i)->xquery, msg, args)) {
+	msg.deliverTo((*i)->queue);
+
+	if ((*i)->mgmtBinding.get() != 0)
+	  (*i)->mgmtBinding->inc_msgMatched ();
+      }
+    }
+
+    if(!count){
+      QPID_LOG(warning, "XMLExchange " << getName() << " could not route message with query " << routingKey);
+      if (mgmtExchange.get() != 0) {
+	mgmtExchange->inc_msgDrops  ();
+	mgmtExchange->inc_byteDrops (msg.contentSize ());
+      }
+    }
+    else {
+      if (mgmtExchange.get() != 0) {
+	mgmtExchange->inc_msgRoutes  (count);
+	mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+      }
+    }
+
+    if (mgmtExchange.get() != 0) {
+      mgmtExchange->inc_msgReceives  ();
+      mgmtExchange->inc_byteReceives (msg.contentSize ());
+    }
+
+}
+
+
+bool XmlExchange::isBound(Queue::shared_ptr queue, const string* const routingKey, const FieldTable* const) 
+{
+    XmlBinding::vector::iterator j;
+
+    if (routingKey) {
+      XmlBindingsMap::iterator i = bindingsMap.find(*routingKey);
+
+      if (i == bindingsMap.end())
+	return false;
+      if (!queue)
+	return true;
+      for (j = i->second.begin(); j != i->second.end(); j++)
+	if ((*j)->queue == queue)
+	  return true;
+    } else if (!queue) {
+      //if no queue or routing key is specified, just report whether any bindings exist
+      return bindingsMap.size() > 0;
+    } else {
+      for (XmlBindingsMap::iterator i = bindingsMap.begin(); i != bindingsMap.end(); i++)
+	for (j = i->second.begin(); j != i->second.end(); j++)
+	  if ((*j)->queue == queue)
+	    return true;
+      return false;
+    }
+
+    return false;
+}
+
+
+XmlExchange::~XmlExchange() 
+{
+    bindingsMap.clear();
+}
+
+const std::string XmlExchange::typeName("xml");
+ 
+}
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h?rev=653854&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h Tue May  6 10:52:03 2008
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _XmlExchange_
+#define _XmlExchange_
+
+#include "Exchange.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Monitor.h"
+#include "Queue.h"
+
+#include <xqilla/xqilla-simple.hpp>
+
+#include <boost/scoped_ptr.hpp>
+
+#include <map>
+#include <vector>
+
+namespace qpid {
+namespace broker {
+
+class XmlExchange : public virtual Exchange {
+
+    typedef boost::shared_ptr<XQQuery> Query;
+
+    struct XmlBinding : public Exchange::Binding {
+        typedef boost::shared_ptr<XmlBinding> shared_ptr;
+        typedef std::vector<XmlBinding::shared_ptr> vector;
+
+        boost::shared_ptr<XQQuery> xquery;
+
+        XmlBinding(const std::string& key, const Queue::shared_ptr queue, Exchange* parent, Query query):
+            Binding(key, queue, parent), xquery(query) {}
+    };
+
+        
+    typedef std::map<string, XmlBinding::vector > XmlBindingsMap;
+
+    XmlBindingsMap bindingsMap;
+    XQilla xqilla;
+    qpid::sys::RWlock lock;
+
+    bool matches(Query& query, Deliverable& msg, const qpid::framing::FieldTable* args);
+
+  public:
+    static const std::string typeName;
+        
+    XmlExchange(const std::string& name, management::Manageable* parent = 0);
+    XmlExchange(const string& _name, bool _durable,
+		const qpid::framing::FieldTable& _args, management::Manageable* parent = 0);
+
+    virtual std::string getType() const { return typeName; }
+        
+    virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+    virtual bool unbind(Queue::shared_ptr queue, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+    virtual void route(Deliverable& msg, const std::string& routingKey, const qpid::framing::FieldTable* args);
+
+    virtual bool isBound(Queue::shared_ptr queue, const string* const routingKey, const qpid::framing::FieldTable* const args);
+
+    virtual ~XmlExchange();
+};
+
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/XmlExchange.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Exchange.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Exchange.h?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Exchange.h Tue May  6 10:52:03 2008
@@ -69,6 +69,13 @@
          * match zero or more words.
          */
 	static const std::string TOPIC_EXCHANGE;
+
+	/**
+	 *
+	 */
+
+	static const std::string XML_EXCHANGE;
+
         /**
          * The headers exchange routes messages based on whether their
          * headers match the binding arguments specified when

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/FieldTable.h Tue May  6 10:52:03 2008
@@ -48,6 +48,7 @@
   public:
     typedef boost::shared_ptr<FieldValue> ValuePtr;
     typedef std::map<std::string, ValuePtr> ValueMap;
+    typedef ValueMap::iterator iterator;
 
     ~FieldTable();
     uint32_t size() const;
@@ -79,6 +80,10 @@
     ValueMap::const_iterator begin() const { return values.begin(); }
     ValueMap::const_iterator end() const { return values.end(); }
     ValueMap::const_iterator find(const std::string& s) const { return values.find(s); }
+
+    // ### Hack Alert
+
+    ValueMap::iterator getValues() { return values.begin(); }
     
   private:
     ValueMap values;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/.valgrind.supp Tue May  6 10:52:03 2008
@@ -30,3 +30,11 @@
    fun:epoll_ctl
 }
 
+{
+   "Conditional jump or move depends on uninitialised value(s)" from Xerces parser
+   Memcheck:Cond
+   fun:_ZN11xercesc_2_717XMLUTF8Transcoder13transcodeFromEPKhjPtjRjPh
+   fun:_ZN11xercesc_2_79XMLReader14xcodeMoreCharsEPtPhj
+   fun:_ZN11xercesc_2_79XMLReader17refreshCharBufferEv
+}
+

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Tue May  6 10:52:03 2008
@@ -1,4 +1,4 @@
-AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS)   $(APR_CXXFLAGS) -DBOOST_TEST_DYN_LINK
+AM_CXXFLAGS = $(WARNING_CFLAGS) $(CPPUNIT_CXXFLAGS) $(CFLAGS)  $(APR_CXXFLAGS) -DBOOST_TEST_DYN_LINK
 INCLUDES =  -I$(srcdir)/.. -I$(srcdir)/../gen -I$(top_builddir)/src/gen
 
 abs_builddir=@abs_builddir@
@@ -29,6 +29,7 @@
 check_PROGRAMS+=unit_test
 unit_test_LDADD=-lboost_unit_test_framework -lboost_regex  \
 	$(lib_client) $(lib_broker) # $(lib_amqp_0_10)
+
 unit_test_SOURCES= unit_test.cpp unit_test.h \
 	BrokerFixture.h SocketProxy.h \
 	exception_test.cpp \
@@ -45,6 +46,11 @@
 	IncompleteMessageList.cpp \
 	RangeSet.cpp
 
+if HAVE_XML
+unit_test_SOURCES+= XmlClientSessionTest.cpp
+endif
+
+
 # Disabled till we move to amqp_0_10 codec.
 # 	amqp_0_10/serialize.cpp allSegmentTypes.h \
 # 	amqp_0_10/ProxyTemplate.cpp \

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=653854&r1=653853&r2=653854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue May  6 10:52:03 2008
@@ -54,11 +54,13 @@
 
 class FailOnDeliver : public Deliverable
 {
+    Message msg;
 public:
     void deliverTo(Queue::shared_ptr& queue)
     {
         throw Exception(QPID_MSG("Invalid delivery to " << queue->getName()));
     }
+    Message& getMessage() { return msg; }
 };
 
 class QueueTest : public CppUnit::TestCase  

Added: incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp?rev=653854&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp Tue May  6 10:52:03 2008
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to  the Apachef Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "unit_test.h"
+#include "BrokerFixture.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/framing/TransferContent.h"
+#include "qpid/framing/reply_exceptions.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Dispatcher.h"
+#include "qpid/client/LocalQueue.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/SubscriptionManager.h"
+
+#include <boost/optional.hpp>
+#include <boost/lexical_cast.hpp>
+
+#include <vector>
+
+QPID_AUTO_TEST_SUITE(XmlClientSessionTest)
+
+using namespace qpid::client;
+
+using namespace qpid::client::arg;
+using namespace qpid::framing;
+using namespace qpid;
+using qpid::sys::Monitor;
+using std::string;
+using std::cout;
+using std::endl;
+
+
+struct DummyListener : public sys::Runnable, public MessageListener {
+    std::vector<Message> messages;
+    string name;
+    uint expected;
+    Dispatcher dispatcher;
+
+    DummyListener(Session& session, const string& n, uint ex) :
+        name(n), expected(ex), dispatcher(session) {}
+
+    void run()
+    {
+        dispatcher.listen(name, this);
+        dispatcher.run();
+    }
+
+    void received(Message& msg)
+    {
+        messages.push_back(msg);
+        if (--expected == 0)
+            dispatcher.stop();
+    }
+};
+
+
+class SubscribedLocalQueue : public LocalQueue {
+  private:
+    SubscriptionManager& subscriptions;
+  public:
+    SubscribedLocalQueue(SubscriptionManager& subs) : subscriptions(subs) {}
+    Message get () { return pop(); }
+    virtual ~SubscribedLocalQueue() {}
+};
+
+
+struct SimpleListener : public MessageListener
+{
+    Monitor lock;
+    std::vector<Message> messages;
+
+    void received(Message& msg)
+    {
+        Monitor::ScopedLock l(lock);
+        messages.push_back(msg);
+        lock.notifyAll();
+    }
+
+    void waitFor(const uint n)
+    {
+        Monitor::ScopedLock l(lock);
+        while (messages.size() < n) {
+            lock.wait();
+        }
+    }
+};
+
+struct ClientSessionFixture : public ProxySessionFixture
+{
+    void declareSubscribe(const string& q="odd_blue",
+                          const string& dest="xml")
+    {
+        session.queueDeclare(queue=q);
+        session.messageSubscribe(queue=q, destination=dest, acquireMode=1);
+        session.messageFlow(destination=dest, unit=0, value=0xFFFFFFFF);//messages
+        session.messageFlow(destination=dest, unit=1, value=0xFFFFFFFF);//bytes
+    }
+};
+
+// ########### START HERE ####################################
+
+BOOST_AUTO_TEST_CASE(testXmlBinding) {
+  ClientSessionFixture f;
+
+  Session session = f.connection.newSession(ASYNC);
+  SubscriptionManager subscriptions(session);
+  SubscribedLocalQueue localQueue(subscriptions);
+
+  session.exchangeDeclare(qpid::client::arg::exchange="xml", qpid::client::arg::type="xml");
+  session.queueDeclare(qpid::client::arg::queue="odd_blue");
+  subscriptions.subscribe(localQueue, "odd_blue");
+
+  FieldTable binding;
+  binding.setString("xquery", "declare variable $color external;"
+                               "(./message/id mod 2 = 1) and ($color = 'blue')");
+  session.exchangeBind(qpid::client::arg::exchange="xml", qpid::client::arg::queue="odd_blue", qpid::client::arg::bindingKey="query_name", qpid::client::arg::arguments=binding); 
+
+  Message message;
+  message.getDeliveryProperties().setRoutingKey("query_name"); 
+
+  message.getHeaders().setString("color", "blue");
+  string m = "<message><id>1</id></message>";
+  message.setData(m);
+
+  session.messageTransfer(qpid::client::arg::content=message,  qpid::client::arg::destination="xml");
+
+  Message m2 = localQueue.get();
+  BOOST_CHECK_EQUAL(m, m2.getData());  
+}
+
+//### Test: Bad XML does not kill the server
+
+//### Test: Bad XQuery does not kill the server
+
+//### Test: Bindings persist, surviving broker restart
+
+QPID_AUTO_TEST_SUITE_END()
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/XmlClientSessionTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date