You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2006/12/20 16:11:38 UTC

svn commit: r489110 - in /incubator/qpid/trunk/qpid/cpp: lib/common/sys/posix/ tests/

Author: aconway
Date: Wed Dec 20 07:11:37 2006
New Revision: 489110

URL: http://svn.apache.org/viewvc?view=rev&rev=489110
Log:
Adding files for EventChannel implementation.

Added:
    incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp
      - copied, changed from r483130, incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/PosixAcceptor.cpp
    incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.h   (with props)
    incubator/qpid/trunk/qpid/cpp/tests/AcceptorTest.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/tests/EventChannelConnectionTest.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/tests/MockSessionHandler.h   (with props)
    incubator/qpid/trunk/qpid/cpp/tests/run-system-tests
      - copied, changed from r486791, incubator/qpid/trunk/qpid/cpp/tests/run-python-tests

Copied: incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp (from r483130, incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/PosixAcceptor.cpp)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp?view=diff&rev=489110&p1=incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/PosixAcceptor.cpp&r1=483130&p2=incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp&r2=489110
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/PosixAcceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelAcceptor.cpp Wed Dec 20 07:11:37 2006
@@ -18,31 +18,132 @@
  * under the License.
  *
  */
+#include <iostream>
 
+#include <boost/assert.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+#include <boost/ptr_container/ptr_deque.hpp>
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <sys/SessionContext.h>
+#include <sys/SessionHandler.h>
+#include <sys/SessionHandlerFactory.h>
 #include <sys/Acceptor.h>
+#include <sys/Socket.h>
+#include <framing/Buffer.h>
+#include <framing/AMQFrame.h>
 #include <Exception.h>
 
+#include "EventChannelConnection.h"
+
 namespace qpid {
 namespace sys {
 
-namespace {
-void fail() { throw qpid::Exception("PosixAcceptor not implemented"); }
-}
+using namespace qpid::framing;
+using namespace std;
 
-class PosixAcceptor : public Acceptor {
+class EventChannelAcceptor : public Acceptor {
   public:
-    virtual int16_t getPort() const { fail(); return 0; }
-    virtual void run(qpid::sys::SessionHandlerFactory* ) { fail(); }
-    virtual void shutdown() { fail(); }
+
+
+    EventChannelAcceptor(
+        int16_t port_, int backlog, int nThreads, bool trace_
+    );
+        
+    int getPort() const;
+    
+    void run(SessionHandlerFactory& factory);
+
+    void shutdown();
+
+  private:
+
+    void accept();
+
+    Mutex lock;
+    Socket listener;
+    const int port;
+    const bool isTrace;
+    bool isRunning;
+    boost::ptr_vector<EventChannelConnection> connections;
+    AcceptEvent acceptEvent;
+    SessionHandlerFactory* factory;
+    bool isShutdown;
+    EventChannelThreads::shared_ptr threads;
 };
 
-// Define generic Acceptor::create() to return APRAcceptor.
-    Acceptor::shared_ptr Acceptor::create(int16_t , int, int, bool)
+Acceptor::shared_ptr Acceptor::create(
+    int16_t port, int backlog, int threads, bool trace)
 {
-    return Acceptor::shared_ptr(new PosixAcceptor());
+    return Acceptor::shared_ptr(
+        new EventChannelAcceptor(port, backlog, threads, trace));
 }
 
 // Must define Acceptor virtual dtor.
 Acceptor::~Acceptor() {}
 
-}}
+EventChannelAcceptor::EventChannelAcceptor(
+    int16_t port_, int backlog, int nThreads, bool trace_
+) : listener(Socket::createTcp()),
+    port(listener.listen(int(port_), backlog)),
+    isTrace(trace_),
+    isRunning(false),
+    acceptEvent(listener.fd(),
+                boost::bind(&EventChannelAcceptor::accept, this)),
+    factory(0),
+    isShutdown(false),
+    threads(EventChannelThreads::create(EventChannel::create(), nThreads))
+{ }
+    
+int EventChannelAcceptor::getPort() const {
+    return port;                // Immutable no need for lock.
+}
+    
+void EventChannelAcceptor::run(SessionHandlerFactory& f) {
+    {
+        Mutex::ScopedLock l(lock);
+        if (!isRunning && !isShutdown) {
+            isRunning = true;
+            factory = &f;
+            threads->post(acceptEvent);
+        }
+    }
+    threads->join();            // Wait for shutdown.
+}
+
+void EventChannelAcceptor::shutdown() {
+    bool doShutdown = false;
+    {
+        Mutex::ScopedLock l(lock);
+        doShutdown = !isShutdown; // I'm the shutdown thread.
+        isShutdown = true;
+    }
+    if (doShutdown) {
+        ::close(acceptEvent.getDescriptor());
+        threads->shutdown();
+        for_each(connections.begin(), connections.end(),
+                 boost::bind(&EventChannelConnection::close, _1));
+    }
+    threads->join();
+}
+
+void EventChannelAcceptor::accept()
+{
+    // No lock, we only post one accept event at a time.
+    if (isShutdown)
+        return;
+    if (acceptEvent.getException()) {
+        Exception::log(*acceptEvent.getException(),
+                       "EventChannelAcceptor::accept");
+        shutdown();
+        return;
+    }
+    // TODO aconway 2006-11-29: Need to reap closed connections also.
+    int fd = acceptEvent.getAcceptedDesscriptor();
+    connections.push_back(
+        new EventChannelConnection(threads, *factory, fd, fd, isTrace));
+    threads->post(acceptEvent); // Keep accepting.
+}
+
+}} // namespace qpid::sys

Added: incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.cpp?view=auto&rev=489110
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.cpp Wed Dec 20 07:11:37 2006
@@ -0,0 +1,229 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <iostream>
+
+#include <boost/bind.hpp>
+#include <boost/assert.hpp>
+
+#include "EventChannelConnection.h"
+#include "sys/SessionHandlerFactory.h"
+#include "QpidError.h"
+
+using namespace std;
+using namespace qpid;
+using namespace qpid::framing;
+
+namespace qpid {
+namespace sys {
+
+const size_t EventChannelConnection::bufferSize = 65536;
+    
+EventChannelConnection::EventChannelConnection(
+    EventChannelThreads::shared_ptr threads_, 
+    SessionHandlerFactory& factory_,
+    int rfd,
+    int wfd,
+    bool isTrace_
+) :
+    readFd(rfd),
+    writeFd(wfd ? wfd : rfd),
+    readCallback(boost::bind(&EventChannelConnection::closeOnException,
+                             this, &EventChannelConnection::endInitRead)),
+
+    isWriting(false),
+    isClosed(false),
+    threads(threads_),
+    handler(factory_.create(this)),
+    in(bufferSize),
+    out(bufferSize),
+    isTrace(isTrace_)
+{
+    BOOST_ASSERT(readFd > 0);
+    BOOST_ASSERT(writeFd > 0);
+    closeOnException(&EventChannelConnection::startRead);
+}
+
+
+void EventChannelConnection::send(std::auto_ptr<AMQFrame> frame) {
+    {
+        Monitor::ScopedLock lock(monitor);
+        assert(frame.get());
+        writeFrames.push_back(frame.release());
+    }
+    closeOnException(&EventChannelConnection::startWrite);
+}
+
+void EventChannelConnection::close() {
+    {
+        Monitor::ScopedLock lock(monitor);
+        if (isClosed)
+            return;
+        isClosed = true;
+    }
+    ::close(readFd);
+    ::close(writeFd);
+    {
+        Monitor::ScopedLock lock(monitor);
+        while (busyThreads > 0)
+            monitor.wait();
+    }
+    handler->closed();
+}
+
+void EventChannelConnection::closeNoThrow() {
+    Exception::tryCatchLog<void>(
+        boost::bind(&EventChannelConnection::close, this),
+        false,
+        "Exception closing channel"
+    );
+}
+
+/**
+ * Call f in a try/catch block and close the connection if 
+ * an exception is thrown.
+ */
+void EventChannelConnection::closeOnException(MemberFnPtr f)
+{
+    try {
+        Exception::tryCatchLog<void>(
+            boost::bind(f, this),
+            "Closing connection due to exception"
+        );
+        return;
+    } catch (...) {
+        // Exception was already logged by tryCatchLog
+        closeNoThrow();
+    }
+}
+    
+// Post the write event.
+// Always called inside closeOnException.
+// Called by endWrite and send, but only one thread writes at a time.
+// 
+void EventChannelConnection::startWrite() {
+    FrameQueue::auto_type frame;
+    {
+        Monitor::ScopedLock lock(monitor);
+        // Stop if closed or a write event is already in progress.
+        if (isClosed || isWriting)
+            return;
+        if (writeFrames.empty()) {
+            isWriting = false;
+            return;
+        }
+        isWriting = true;
+        frame = writeFrames.pop_front();
+    }
+    // No need to lock here - only one thread can be writing at a time.
+    out.clear();
+    if (isTrace)
+        cout << "Send on socket " << writeFd << ": " << *frame << endl;
+    frame->encode(out);
+    out.flip();
+    writeEvent = WriteEvent(
+        writeFd, out.start(), out.available(),
+        boost::bind(&EventChannelConnection::closeOnException,
+                    this, &EventChannelConnection::endWrite));
+    threads->post(writeEvent);
+}
+
+// ScopedBusy ctor increments busyThreads.
+// dtor decrements and calls monitor.notifyAll if it reaches 0.
+// 
+struct EventChannelConnection::ScopedBusy : public AtomicCount::ScopedIncrement
+{
+    ScopedBusy(EventChannelConnection& ecc)
+        : AtomicCount::ScopedIncrement(
+            ecc.busyThreads, boost::bind(&Monitor::notifyAll, &ecc.monitor))
+    {}
+};
+
+// Write event completed.
+// Always called by a channel thread inside closeOnException.
+// 
+void EventChannelConnection::endWrite() {
+    ScopedBusy(*this);
+    {
+        Monitor::ScopedLock lock(monitor);
+        isWriting = false;
+        if (isClosed)
+            return;
+        writeEvent.throwIfException();
+    }
+    // Check if there's more in to write in the write queue.
+    startWrite();
+}
+    
+
+// Post the read event.
+// Always called inside closeOnException.
+// Called from ctor and end[Init]Read, so only one call at a time
+// is possible since we only post one read event at a time.
+// 
+void EventChannelConnection::startRead() {
+    // Non blocking read, as much as we can swallow.
+    readEvent = ReadEvent(
+        readFd, in.start(), in.available(), readCallback,true);
+    threads->post(readEvent);
+}
+
+// Completion of initial read, expect protocolInit.
+// Always called inside closeOnException in channel thread.
+// Only called by one thread at a time.
+void EventChannelConnection::endInitRead() {
+    ScopedBusy(*this);
+    if (!isClosed) {
+        readEvent.throwIfException();
+        in.move(readEvent.getBytesRead());
+        in.flip();
+        ProtocolInitiation protocolInit;
+        if(protocolInit.decode(in)){
+            handler->initiated(&protocolInit);
+            readCallback = boost::bind(
+                &EventChannelConnection::closeOnException,
+                this, &EventChannelConnection::endRead);
+        }
+        in.compact();
+        // Continue reading.
+        startRead();
+    }
+}
+    
+// Normal reads, expect a frame.
+// Always called inside closeOnException in channel thread.
+void EventChannelConnection::endRead() {
+    ScopedBusy(*this);
+    if (!isClosed) {
+        readEvent.throwIfException();
+        in.move(readEvent.getBytesRead());
+        in.flip();
+        AMQFrame frame;
+        while (frame.decode(in)) {
+            // TODO aconway 2006-11-30: received should take Frame&
+            if (isTrace)
+                cout << "Received on socket " << readFd
+                     << ": " << frame << endl;
+            handler->received(&frame); 
+        }
+        in.compact();
+        startRead();
+    }
+}
+
+}} // namespace qpid::sys

Propchange: incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.h?view=auto&rev=489110
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.h (added)
+++ incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.h Wed Dec 20 07:11:37 2006
@@ -0,0 +1,102 @@
+#ifndef _posix_EventChannelConnection_h
+#define _posix_EventChannelConnection_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/ptr_container/ptr_deque.hpp>
+
+#include "EventChannelThreads.h"
+#include "sys/Monitor.h"
+#include "sys/SessionContext.h"
+#include "sys/SessionHandler.h"
+#include "sys/AtomicCount.h"
+#include "framing/AMQFrame.h"
+
+namespace qpid {
+namespace sys {
+
+class SessionHandlerFactory;
+
+/**
+ * Implements SessionContext and delegates to a SessionHandler
+ * for a connection via the EventChannel.
+ *@param readDescriptor file descriptor for reading.
+ *@param writeDescriptor file descriptor for writing,
+ * by default same as readDescriptor
+ */
+class EventChannelConnection : public SessionContext {
+  public:
+    EventChannelConnection(
+        EventChannelThreads::shared_ptr threads, 
+        SessionHandlerFactory& factory,
+        int readDescriptor, 
+        int writeDescriptor = 0,
+        bool isTrace = false
+    );
+
+    // TODO aconway 2006-11-30: SessionContext::send should take auto_ptr
+    virtual void send(qpid::framing::AMQFrame* frame) {
+        send(std::auto_ptr<qpid::framing::AMQFrame>(frame));
+    }
+            
+    virtual void send(std::auto_ptr<qpid::framing::AMQFrame> frame);
+
+    virtual void close();
+
+  private:
+    typedef boost::ptr_deque<qpid::framing::AMQFrame> FrameQueue;
+    typedef void (EventChannelConnection::*MemberFnPtr)();
+    struct ScopedBusy;
+
+    void startWrite();
+    void endWrite();
+    void startRead();
+    void endInitRead();
+    void endRead();
+    void closeNoThrow();
+    void closeOnException(MemberFnPtr);
+    bool shouldContinue(bool& flag);
+
+    static const size_t bufferSize;
+
+    Monitor monitor;
+
+    int readFd, writeFd;
+    ReadEvent readEvent;
+    WriteEvent writeEvent;
+    Event::Callback readCallback;
+    bool isWriting;
+    bool isClosed;
+    AtomicCount busyThreads;
+
+    EventChannelThreads::shared_ptr threads;
+    std::auto_ptr<SessionHandler> handler;
+    qpid::framing::Buffer in, out;
+    FrameQueue writeFrames;
+    bool isTrace;
+    
+  friend struct ScopedBusy;
+};
+    
+
+}} // namespace qpid::sys
+
+
+
+#endif  /*!_posix_EventChannelConnection_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/lib/common/sys/posix/EventChannelConnection.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/AcceptorTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/AcceptorTest.cpp?view=auto&rev=489110
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/AcceptorTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/AcceptorTest.cpp Wed Dec 20 07:11:37 2006
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <boost/bind.hpp>
+
+#include "sys/Thread.h"
+#include "sys/Acceptor.h"
+#include "sys/Socket.h"
+#include "framing/Buffer.h"
+#include "qpid_test_plugin.h"
+
+#include "MockSessionHandler.h"
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace std;
+
+const char hello[] = "hello";
+const size_t size = sizeof(hello);
+
+        
+class AcceptorTest : public CppUnit::TestCase, private Runnable
+{
+    CPPUNIT_TEST_SUITE(AcceptorTest);
+    CPPUNIT_TEST(testAccept);
+    CPPUNIT_TEST_SUITE_END();
+
+  private:
+    MockSessionHandlerFactory factory;
+    Acceptor::shared_ptr acceptor;
+
+  public:
+
+    void run() {
+        acceptor->run(factory);
+    }
+
+    void setUp() {
+        acceptor = Acceptor::create(0, 10, 3);
+    }
+
+    void tearDown() {
+        acceptor.reset();
+    }
+
+    void testAccept()
+    {
+        int port = acceptor->getPort();
+        CPPUNIT_ASSERT(port > 0);
+        Thread runThread(*this);
+        // Connect to the acceptor
+        Socket client = Socket::createTcp();
+        client.connect("localhost", port);
+        factory.waitForHandler();
+        CPPUNIT_ASSERT(factory.handler != 0);
+        // Send a protocol initiation.
+        Buffer buf(1024);
+        ProtocolInitiation(4,2).encode(buf);
+        buf.flip();
+        client.send(buf.start(), buf.available());
+
+        // Verify session handler got the protocol init.
+        ProtocolInitiation init = factory.handler->waitForProtocolInit();
+        CPPUNIT_ASSERT_EQUAL(int(4), int(init.getMajor()));
+        CPPUNIT_ASSERT_EQUAL(int(2), int(init.getMinor()));
+
+        acceptor->shutdown();
+        runThread.join(); 
+        factory.handler->waitForClosed();
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(AcceptorTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/AcceptorTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/AcceptorTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/EventChannelConnectionTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/EventChannelConnectionTest.cpp?view=auto&rev=489110
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/EventChannelConnectionTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/EventChannelConnectionTest.cpp Wed Dec 20 07:11:37 2006
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <iostream>
+#include <boost/bind.hpp>
+#include "framing/AMQHeartbeatBody.h"
+#include "framing/AMQFrame.h"
+#include "sys/posix/EventChannelConnection.h"
+#include "sys/SessionHandler.h"
+#include "sys/SessionHandlerFactory.h"
+#include "sys/Socket.h"
+#include "qpid_test_plugin.h"
+#include "MockSessionHandler.h"
+
+using namespace qpid::sys;
+using namespace qpid::framing;
+using namespace std;
+
+class EventChannelConnectionTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(EventChannelConnectionTest);
+    CPPUNIT_TEST(testSendReceive);
+    CPPUNIT_TEST(testCloseExternal);
+    CPPUNIT_TEST(testCloseException);
+    CPPUNIT_TEST_SUITE_END();
+
+  public:
+
+    void setUp() {
+        threads = EventChannelThreads::create();
+        CPPUNIT_ASSERT_EQUAL(0, ::pipe(pipe));
+        connection.reset(
+            new EventChannelConnection(threads, factory, pipe[0], pipe[1]));
+        CPPUNIT_ASSERT(factory.handler != 0);
+    }
+
+    void tearDown() {
+        threads->shutdown();
+        threads->join();
+    }
+
+    void testSendReceive()
+    {
+        // Send a protocol initiation.
+        Buffer buf(1024);
+        ProtocolInitiation(4,2).encode(buf);
+        buf.flip();
+        ssize_t n = write(pipe[1], buf.start(), buf.available());
+        CPPUNIT_ASSERT_EQUAL(ssize_t(buf.available()), n);
+
+        // Verify session handler got the protocol init.
+        ProtocolInitiation init = factory.handler->waitForProtocolInit();
+        CPPUNIT_ASSERT_EQUAL(int(4), int(init.getMajor()));
+        CPPUNIT_ASSERT_EQUAL(int(2), int(init.getMinor()));
+
+        // Send a heartbeat frame, verify connection got it.
+        connection->send(new AMQFrame(42, new AMQHeartbeatBody()));
+        AMQFrame frame = factory.handler->waitForFrame();
+        CPPUNIT_ASSERT_EQUAL(u_int16_t(42), frame.getChannel());
+        CPPUNIT_ASSERT_EQUAL(u_int8_t(HEARTBEAT_BODY),
+                             frame.getBody()->type());
+        threads->shutdown();
+    }
+
+    // Make sure the handler is closed if the connection is closed.
+    void testCloseExternal() {
+        connection->close();
+        factory.handler->waitForClosed();
+    }
+
+    // Make sure the handler is closed if the connection closes or fails.
+    // TODO aconway 2006-12-18: logs exception message in test output.
+    void testCloseException() {
+        ::close(pipe[0]);
+        ::close(pipe[1]);
+        // TODO aconway 2006-12-18: Shouldn't this be failing?
+        connection->send(new AMQFrame(42, new AMQHeartbeatBody()));
+        factory.handler->waitForClosed();
+    }
+
+  private:
+    EventChannelThreads::shared_ptr threads;
+    int pipe[2];
+    std::auto_ptr<EventChannelConnection> connection;
+    MockSessionHandlerFactory factory;
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(EventChannelConnectionTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/tests/EventChannelConnectionTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/EventChannelConnectionTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/tests/MockSessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/MockSessionHandler.h?view=auto&rev=489110
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/MockSessionHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/tests/MockSessionHandler.h Wed Dec 20 07:11:37 2006
@@ -0,0 +1,113 @@
+#ifndef _tests_MockSessionHandler_h
+#define _tests_MockSessionHandler_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "sys/SessionHandler.h"
+#include "sys/SessionHandlerFactory.h"
+#include "sys/Monitor.h"
+#include "framing/ProtocolInitiation.h"
+
+struct MockSessionHandler : public qpid::sys::SessionHandler {
+
+    MockSessionHandler() : state(START) {}
+
+    ~MockSessionHandler() {}
+    
+    void initiated(qpid::framing::ProtocolInitiation* pi) {
+        qpid::sys::Monitor::ScopedLock l(monitor);
+        init = *pi;
+        setState(GOT_INIT);
+    }
+
+    void received(qpid::framing::AMQFrame* framep) {
+        qpid::sys::Monitor::ScopedLock l(monitor);
+        frame = *framep;
+        setState(GOT_FRAME);
+    }
+
+    qpid::framing::ProtocolInitiation waitForProtocolInit() {        
+        waitFor(GOT_INIT);
+        return init;
+    }
+
+    qpid::framing::AMQFrame waitForFrame() {        
+        waitFor(GOT_FRAME);
+        return frame;
+    }
+
+    void waitForClosed() {
+        waitFor(CLOSED);
+    }
+    
+    void closed() {
+        qpid::sys::Monitor::ScopedLock l(monitor);
+        setState(CLOSED);
+    }
+
+    void idleOut() {}
+    void idleIn() {}
+
+  private:
+    typedef enum { START, GOT_INIT, GOT_FRAME, CLOSED } State;
+
+    void setState(State s) {
+        state = s;
+        monitor.notify();
+    }
+    
+    void waitFor(State s) {
+        qpid::sys::Monitor::ScopedLock l(monitor);
+        qpid::sys::Time deadline = qpid::sys::now() + 10*qpid::sys::TIME_SEC; 
+        while (state != s)
+            CPPUNIT_ASSERT(monitor.wait(deadline));
+    }
+
+    qpid::sys::Monitor  monitor;
+    State state;
+    qpid::framing::ProtocolInitiation init;
+    qpid::framing::AMQFrame frame;
+};
+
+
+struct MockSessionHandlerFactory : public qpid::sys::SessionHandlerFactory {
+    MockSessionHandlerFactory() : handler(0) {}
+
+    qpid::sys::SessionHandler* create(qpid::sys::SessionContext*) {
+        qpid::sys::Monitor::ScopedLock lock(monitor);
+        handler = new MockSessionHandler();
+        monitor.notifyAll();
+        return handler;
+    }
+
+    void waitForHandler() {
+        qpid::sys::Monitor::ScopedLock lock(monitor);
+        qpid::sys::Time deadline =
+            qpid::sys::now() + 500 * qpid::sys::TIME_SEC;
+        while (handler == 0)
+            CPPUNIT_ASSERT(monitor.wait(deadline));
+    }
+    
+    MockSessionHandler* handler;
+    qpid::sys::Monitor monitor;
+};
+
+
+
+#endif  /*!_tests_MockSessionHandler_h*/

Propchange: incubator/qpid/trunk/qpid/cpp/tests/MockSessionHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/tests/MockSessionHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/trunk/qpid/cpp/tests/run-system-tests (from r486791, incubator/qpid/trunk/qpid/cpp/tests/run-python-tests)
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/run-system-tests?view=diff&rev=489110&p1=incubator/qpid/trunk/qpid/cpp/tests/run-python-tests&r1=486791&p2=incubator/qpid/trunk/qpid/cpp/tests/run-system-tests&r2=489110
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/run-python-tests (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/run-system-tests Wed Dec 20 07:11:37 2006
@@ -9,7 +9,30 @@
 trap 'status=$?; kill $pid; exit $status' 0
 trap '(exit $?); exit $?' 1 2 13 15
 
-# Run the tests.
-cd ../../python && ./run-tests -v -I cpp_failing.txt
+# Run C++ client tests.
+run_test() {
+    test="$*"
+    echo -n "Running: $test ... "
+    if $test >test.out 2>&1 ; then
+	echo " Passed" ;
+    else
+	echo " FAILED. Output:";
+	cat test.out
+	FAILED=yes
+    fi
+    rm -f test.out
+}
+
+run_test ./client_test
+run_test ./topictest -l2 -m2 -b1
+
+# Run the python tests.
+if test -d ../../python ;  then
+    cd ../../python && ./run-tests -v -I cpp_failing.txt
+else
+    echo Warning: python tests not found.
+fi
+
+# TODO aconway 2006-12-13: run the other client tests.
 
 rm -f $log