You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2009/03/26 22:04:47 UTC

svn commit: r758852 - /qpid/trunk/qpid/cpp/src/tests/SocketProxy.h

Author: shuston
Date: Thu Mar 26 21:04:47 2009
New Revision: 758852

URL: http://svn.apache.org/viewvc?rev=758852&view=rev
Log:
Enable SocketProxy portability to Windows; fixes QPID-1765

Modified:
    qpid/trunk/qpid/cpp/src/tests/SocketProxy.h

Modified: qpid/trunk/qpid/cpp/src/tests/SocketProxy.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/SocketProxy.h?rev=758852&r1=758851&r2=758852&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/SocketProxy.h (original)
+++ qpid/trunk/qpid/cpp/src/tests/SocketProxy.h Thu Mar 26 21:04:47 2009
@@ -21,45 +21,58 @@
  *
  */
 
+#include "qpid/sys/IOHandle.h"
+#ifdef _WIN32
+#  include "qpid/sys/windows/IoHandlePrivate.h"
+   typedef SOCKET FdType;
+#else
+#  include "qpid/sys/posix/PrivatePosix.h"
+   typedef int FdType;
+#endif
 #include "qpid/sys/Socket.h"
-#include "qpid/sys/Poller.h"
 #include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include "qpid/sys/Mutex.h"
-#include "qpid/client/Connection.h"
 #include "qpid/log/Statement.h"
 
-#include <algorithm>
-
 /**
  * A simple socket proxy that forwards to another socket. 
  * Used between client & local broker to simulate network failures.
  */
 class SocketProxy : private qpid::sys::Runnable
 {
+    // Need a Socket we can get the fd from
+    class LowSocket : public qpid::sys::Socket {
+    public:
+        FdType getFd() { return toFd(impl); }
+    };
+
   public:
     /** Connect to connectPort on host, start a forwarding thread.
      * Listen for connection on getPort().
      */
     SocketProxy(int connectPort, const std::string host="localhost")
-        : closed(false), port(listener.listen()), dropClient(), dropServer()
+      : closed(false), joined(true),
+        port(listener.listen()), dropClient(), dropServer()
     {
         client.connect(host, connectPort);
+        joined = false;
         thread = qpid::sys::Thread(static_cast<qpid::sys::Runnable*>(this));
     }
     
-    ~SocketProxy() { close(); }
+      ~SocketProxy() { close(); if (!joined) thread.join(); }
 
     /** Simulate a network disconnect. */
     void close() {
         {
             qpid::sys::Mutex::ScopedLock l(lock);
-            if (closed) return;
+            if (closed) { return; }
             closed=true;
         }
-        poller.shutdown();
-        if (thread.id() != qpid::sys::Thread::current().id())
-        thread.join();
+        if (thread.id() != qpid::sys::Thread::current().id()) {
+            thread.join();
+            joined = true;
+        }
         client.close();
     }
 
@@ -85,56 +98,72 @@
     }
 
     void run() {
-        std::auto_ptr<qpid::sys::Socket> server;
+        std::auto_ptr<LowSocket> server;
         try {
-            qpid::sys::PollerHandle listenerHandle(listener);
-            poller.addFd(listenerHandle, qpid::sys::Poller::INPUT);
-            qpid::sys::Poller::Event event = poller.wait();
-            throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
-            throwIf(!(event.type == qpid::sys::Poller::READABLE && event.handle == &listenerHandle), "SocketProxy: Accept failed");
-
-            poller.delFd(listenerHandle);
-            server.reset(listener.accept());
-
-            // Pump data between client & server sockets
-            qpid::sys::PollerHandle clientHandle(client);
-            qpid::sys::PollerHandle serverHandle(*server);
-            poller.addFd(clientHandle, qpid::sys::Poller::INPUT);
-            poller.addFd(serverHandle, qpid::sys::Poller::INPUT);
+            fd_set socks;
+            FD_ZERO(&socks);
+            FdType maxFd = listener.getFd();
+            FD_SET(maxFd, &socks);
+            struct timeval tmo;
+            for (;;) {
+                tmo.tv_sec = 0;
+                tmo.tv_usec = 500 * 1000;
+                if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) {
+                    qpid::sys::Mutex::ScopedLock l(lock);
+                    throwIf(closed, "SocketProxy: Closed by close()");
+                    continue;
+                }
+                throwIf(!FD_ISSET(maxFd, &socks), "SocketProxy: Accept failed");
+                break;   // Accept ready... go to next step
+            }
+            server.reset(reinterpret_cast<LowSocket *>(listener.accept()));
+            maxFd = server->getFd();
+            if (client.getFd() > maxFd)
+                maxFd = client.getFd();
             char buffer[1024];
             for (;;) {
-                qpid::sys::Poller::Event event = poller.wait();
-                throwIf(event.type == qpid::sys::Poller::SHUTDOWN, "SocketProxy: Closed by close()");
-                throwIf(event.type == qpid::sys::Poller::DISCONNECTED, "SocketProxy: client/server disconnected");
-                if (event.handle == &serverHandle) {
+                FD_ZERO(&socks);
+                tmo.tv_sec = 0;
+                tmo.tv_usec = 500 * 1000;
+                FD_SET(client.getFd(), &socks);
+                FD_SET(server->getFd(), &socks);
+                if (select(maxFd+1, &socks, 0, 0, &tmo) == 0) {
+                    qpid::sys::Mutex::ScopedLock l(lock);
+                    throwIf(closed, "SocketProxy: Closed by close()");
+                    continue;
+                }
+                // Something is set; relay data as needed until something closes
+                if (FD_ISSET(server->getFd(), &socks)) {
                     ssize_t n = server->read(buffer, sizeof(buffer));
+                    throwIf(n <= 0, "SocketProxy: server disconnected");
                     if (!dropServer) client.write(buffer, n);
-                    poller.rearmFd(serverHandle);
-                } else if (event.handle == &clientHandle) {
+                }
+                if (FD_ISSET(client.getFd(), &socks)) {
                     ssize_t n = client.read(buffer, sizeof(buffer));
-                    if (!dropClient) server->write(buffer, n);
-                    poller.rearmFd(clientHandle);
-                } else {
-                    throwIf(true, "SocketProxy: No handle ready");
+                    throwIf(n <= 0, "SocketProxy: client disconnected");
+                    if (!dropServer) server->write(buffer, n);
                 }
+                if (!FD_ISSET(client.getFd(), &socks) &&
+                    !FD_ISSET(server->getFd(), &socks))
+                    throwIf(true, "SocketProxy: No handle ready");
             }
         }
         catch (const std::exception& e) {
             QPID_LOG(debug, "SocketProxy::run exception: " << e.what());
         }
         try {
-        if (server.get()) server->close();
-        close(); 
-    }
+            if (server.get()) server->close();
+            close(); 
+        }
         catch (const std::exception& e) {
             QPID_LOG(debug, "SocketProxy::run exception in client/server close()" << e.what());
         }
     }
 
     mutable qpid::sys::Mutex lock;
-    bool closed;
-    qpid::sys::Poller poller;
-    qpid::sys::Socket client, listener;
+    mutable bool closed;
+    bool joined;
+    LowSocket client, listener;
     uint16_t port;
     qpid::sys::Thread thread;
     bool dropClient, dropServer;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org