You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by as...@apache.org on 2008/07/30 08:29:51 UTC

svn commit: r680921 - in /incubator/qpid/trunk/qpid/cpp: configure.ac src/Makefile.am src/qpid/sys/solaris/ src/qpid/sys/solaris/ECFPoller.cpp

Author: astitcher
Date: Tue Jul 29 23:29:51 2008
New Revision: 680921

URL: http://svn.apache.org/viewvc?rev=680921&view=rev
Log:
QPID-1198: Solaris ECF (port) based Poller
Patch from Manuel Teira

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
Modified:
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?rev=680921&r1=680920&r2=680921&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Tue Jul 29 23:29:51 2008
@@ -54,31 +54,45 @@
  esac],
  [enableval=yes])
 
-# Warnings: Enable as many as possible, keep the code clean. Please
-# do not disable warnings or remove -Werror without discussing on
-# qpid-dev list.
-#
-# The following warnings are deliberately omitted, they warn on valid code.
-# -Wunreachable-code -Wpadded -Winline
-# -Wshadow - warns about boost headers.
-
-if test "${enableval}" = yes; then
-  gl_COMPILER_FLAGS(-Werror)
-  gl_COMPILER_FLAGS(-pedantic)
-  gl_COMPILER_FLAGS(-Wall)
-  gl_COMPILER_FLAGS(-Wextra)
-  gl_COMPILER_FLAGS(-Wno-shadow)
-  gl_COMPILER_FLAGS(-Wpointer-arith)
-  gl_COMPILER_FLAGS(-Wcast-qual)
-  gl_COMPILER_FLAGS(-Wcast-align)
-  gl_COMPILER_FLAGS(-Wno-long-long)
-  gl_COMPILER_FLAGS(-Wvolatile-register-var)
-  gl_COMPILER_FLAGS(-Winvalid-pch)
-  gl_COMPILER_FLAGS(-Wno-system-headers)
-  gl_COMPILER_FLAGS(-Woverloaded-virtual)	
-  AC_SUBST([WARNING_CFLAGS], [$COMPILER_FLAGS])
-  AC_DEFINE([lint], 1, [Define to 1 if the compiler is checking for lint.])
-  COMPILER_FLAGS=
+# Set up for gcc as compiler
+if test x$GXX = xyes; then
+	# Warnings: Enable as many as possible, keep the code clean. Please
+	# do not disable warnings or remove -Werror without discussing on
+	# qpid-dev list.
+	#
+	# The following warnings are deliberately omitted, they warn on valid code.
+	# -Wunreachable-code -Wpadded -Winline
+	# -Wshadow - warns about boost headers.
+	if test "${enableval}" = yes; then
+	    gl_COMPILER_FLAGS(-Werror)
+	    gl_COMPILER_FLAGS(-pedantic)
+	    gl_COMPILER_FLAGS(-Wall)
+	    gl_COMPILER_FLAGS(-Wextra)
+	    gl_COMPILER_FLAGS(-Wno-shadow)
+	    gl_COMPILER_FLAGS(-Wpointer-arith)
+	    gl_COMPILER_FLAGS(-Wcast-qual)
+	    gl_COMPILER_FLAGS(-Wcast-align)
+	    gl_COMPILER_FLAGS(-Wno-long-long)
+	    gl_COMPILER_FLAGS(-Wvolatile-register-var)
+	    gl_COMPILER_FLAGS(-Winvalid-pch)
+	    gl_COMPILER_FLAGS(-Wno-system-headers)
+	    gl_COMPILER_FLAGS(-Woverloaded-virtual)	
+	    AC_SUBST([WARNING_CFLAGS], [$COMPILER_FLAGS])
+	    AC_DEFINE([lint], 1, [Define to 1 if the compiler is checking for lint.])
+	    COMPILER_FLAGS=
+	fi
+else
+	AC_CHECK_DECL([__SUNPRO_CC], [SUNCC=yes], [SUNCC=no])
+	
+	# Set up for sun CC compiler
+	if test x$SUNCC = xno; then
+		if test "${enableval}" = yes; then
+			WARNING_FLAGS=+w
+		fi
+		CXXFLAGS="$CXXFLAGS -library=stlport4 -mt"
+		LD="$CXX"
+		LDFLAGS="$LDFLAGS -library=stlport4 -mt"
+	fi
 fi
 
 AC_DISABLE_STATIC
@@ -278,6 +292,35 @@
 LIBS=$tmp_LIBS
 AM_CONDITIONAL([RDMA], [test x$with_RDMA = xyes])
 
+poller=no
+AC_ARG_WITH([poller],
+ [AS_HELP_STRING([--with-poller], [The low level poller implementation: poll/solaris-ecf/epoll])],
+ [case ${withval} in
+  poll)
+   AC_CHECK_HEADERS([sys/poll.h],[poller=no],[AC_MSG_ERROR([Can't find poll.h header file for poll])])
+   ;;
+  solaris-ecf)
+   AC_CHECK_HEADERS([port.h],[poller=solaris-ecf],[AC_MSG_ERROR([Can't find port.h header file for solaris-ecf])])
+   ;;
+  epoll)
+   AC_CHECK_HEADERS([sys/epoll.h],[poller=epoll],[AC_MSG_ERROR([Can't find epoll.h header file for epoll])])
+   ;;
+ esac],
+ [
+  AC_CHECK_HEADERS([sys/poll.h],[poller=no],)
+  AC_CHECK_HEADERS([port.h],[poller=solaris-ecf],)
+  AC_CHECK_HEADERS([sys/epoll.h],[poller=epoll],)
+ ]
+)
+
+AM_CONDITIONAL([HAVE_ECF], [test x$poller = xsolaris-ecf])
+AM_CONDITIONAL([HAVE_EPOLL], [test x$poller = xepoll])
+
+#Filter not implemented or invalid mechanisms
+if test $poller = xno; then
+  AC_MSG_ERROR([Polling mechanism not implemented for $host])
+fi
+
 # Files to generate	
 AC_CONFIG_FILES([
   qpidc.spec

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=680921&r1=680920&r2=680921&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Tue Jul 29 23:29:51 2008
@@ -70,7 +70,6 @@
 qpidd_SOURCES = qpidd.cpp
 
 posix_plat_src = \
-  qpid/sys/epoll/EpollPoller.cpp \
   qpid/sys/posix/IOHandle.cpp \
   qpid/sys/posix/Socket.cpp \
   qpid/sys/posix/AsynchIO.cpp \
@@ -90,7 +89,15 @@
   qpid/sys/posix/Fork.h \
   qpid/sys/posix/LockFile.h
 
-platform_src = $(posix_plat_src)
+if HAVE_EPOLL 
+  poller = qpid/sys/epoll/EpollPoller.cpp
+endif
+
+if HAVE_ECF
+  poller = qpid/sys/solaris/ECFPoller.cpp
+endif
+
+platform_src = $(posix_plat_src) $(poller)
 platform_hdr = $(posix_plat_hdr)
 
 lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp?rev=680921&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/solaris/ECFPoller.cpp Tue Jul 29 23:29:51 2008
@@ -0,0 +1,301 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/log/Logger.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/DeletionManager.h"
+#include "qpid/sys/posix/check.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <port.h>
+#include <poll.h>
+#include <errno.h>
+
+#include <assert.h>
+#include <vector>
+#include <exception>
+
+
+//TODO: Remove this
+#include "qpid/sys/Dispatcher.h"
+
+namespace qpid {
+namespace sys {
+
+// Deletion manager to handle deferring deletion of PollerHandles to when they definitely aren't being used 
+DeletionManager<PollerHandle> PollerHandleDeletionManager;
+
+//  Instantiate (and define) class static for DeletionManager
+template <>
+DeletionManager<PollerHandle>::AllThreadsStatuses DeletionManager<PollerHandle>::allThreadsStatuses(0);
+
+class PollerHandlePrivate {
+    friend class Poller;
+    friend class PollerHandle;
+
+    enum FDStat {
+        ABSENT,
+        MONITORED,
+        INACTIVE,
+        HUNGUP,
+        MONITORED_HUNGUP
+    };
+
+    int fd;
+    uint32_t events;
+    FDStat stat;
+    Mutex lock;
+
+    PollerHandlePrivate(int f) :
+      fd(f),
+      events(0),
+      stat(ABSENT) {
+    }
+    
+    bool isActive() const {
+        return stat == MONITORED || stat == MONITORED_HUNGUP;
+    }
+
+    void setActive() {
+        stat = (stat == HUNGUP) ? MONITORED_HUNGUP : MONITORED;
+    }
+
+    bool isInactive() const {
+        return stat == INACTIVE || stat == HUNGUP;
+    }
+
+    void setInactive() {
+        stat = INACTIVE;
+    }
+
+    bool isIdle() const {
+        return stat == ABSENT;
+    }
+
+    void setIdle() {
+        stat = ABSENT;
+    }
+
+    bool isHungup() const {
+        return stat == MONITORED_HUNGUP || stat == HUNGUP;
+    }
+
+    void setHungup() {
+        assert(stat == MONITORED);
+        stat = HUNGUP;
+    }
+};
+
+PollerHandle::PollerHandle(const IOHandle& h) :
+    impl(new PollerHandlePrivate(toFd(h.impl)))
+{}
+
+PollerHandle::~PollerHandle() {
+    delete impl;
+}
+
+void PollerHandle::deferDelete() {
+    PollerHandleDeletionManager.markForDeletion(this);
+}
+
+/**
+ * Concrete implementation of Poller to use the Solaris Event Completion
+ * Framework interface
+ */
+class PollerPrivate {
+    friend class Poller;
+
+    const int portId;
+
+    static uint32_t directionToPollEvent(Poller::Direction dir) {
+        switch (dir) {
+            case Poller::IN: return POLLIN;
+            case Poller::OUT: return POLLOUT;
+            case Poller::INOUT: return POLLIN | POLLOUT;
+            default: return 0;
+        }
+    }
+
+    static Poller::EventType pollToDirection(uint32_t events) {
+        uint32_t e = events & (POLLIN | POLLOUT);
+        switch (e) {
+            case POLLIN: return Poller::READABLE;
+            case POLLOUT: return Poller::WRITABLE;
+            case POLLIN | POLLOUT: return Poller::READ_WRITABLE;
+            default:
+                return (events & (POLLHUP | POLLERR)) ?
+                    Poller::DISCONNECTED : Poller::INVALID;
+        }
+    }
+  
+    PollerPrivate() :
+        portId(::port_create()) {
+    }
+
+    ~PollerPrivate() {
+    }
+};
+
+void Poller::addFd(PollerHandle& handle, Direction dir) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+
+    uint32_t events = 0;
+  
+    if (eh.isIdle()) {
+        events = PollerPrivate::directionToPollEvent(dir);
+    } else {
+        assert(eh.isActive());
+        events = eh.events | PollerPrivate::directionToPollEvent(dir);
+    }
+
+    //port_associate can be used to add an association or modify an
+    //existing one
+    QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, events, &handle));
+    eh.events = events;
+    eh.setActive();
+    QPID_LOG(trace, "Poller::addFd(handle=" << &handle
+             << "[" << typeid(&handle).name()
+             << "], fd=" << eh.fd << ")");
+    //assert(dynamic_cast<DispatchHandle*>(&handle));
+}
+
+void Poller::delFd(PollerHandle& handle) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(!eh.isIdle());
+    int rc = ::port_dissociate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd);
+    //Allow closing an invalid fd, allowing users to close fd before
+    //doing delFd()
+    if (rc == -1 && errno != EBADFD) {
+        QPID_POSIX_CHECK(rc);
+    }
+    eh.setIdle();
+    QPID_LOG(trace, "Poller::delFd(handle=" << &handle
+             << ", fd=" << eh.fd << ")");
+}
+
+// modFd is equivalent to delFd followed by addFd
+void Poller::modFd(PollerHandle& handle, Direction dir) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(!eh.isIdle());
+
+    eh.events = PollerPrivate::directionToPollEvent(dir);
+  
+    //If fd is already associated, events and user arguments are updated
+    //So, no need to check if fd is already associated
+    QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, eh.events, &handle));
+    eh.setActive();
+    QPID_LOG(trace, "Poller::modFd(handle=" << &handle
+             << ", fd=" << eh.fd << ")");
+}
+
+void Poller::rearmFd(PollerHandle& handle) {
+    PollerHandlePrivate& eh = *handle.impl;
+    ScopedLock<Mutex> l(eh.lock);
+    assert(eh.isInactive());
+  
+    QPID_POSIX_CHECK(::port_associate(impl->portId, PORT_SOURCE_FD, (uintptr_t) eh.fd, eh.events, &handle));
+    eh.setActive();
+    QPID_LOG(trace, "Poller::rearmdFd(handle=" << &handle
+             << ", fd=" << eh.fd << ")");
+}
+
+void Poller::shutdown() {
+    //Send an Alarm to the port
+    //We need to send a nonzero event mask, using POLLHUP, but
+    //The wait method will only look for a PORT_ALERT_SET
+    QPID_POSIX_CHECK(::port_alert(impl->portId, PORT_ALERT_SET, POLLHUP, NULL));
+    QPID_LOG(trace, "Poller::shutdown");
+}
+
+Poller::Event Poller::wait(Duration timeout) {
+    timespec_t tout;
+    timespec_t* ptout = NULL;
+    port_event_t pe;
+    
+    if (timeout != TIME_INFINITE) {
+      tout.tv_sec = 0;
+      tout.tv_nsec = timeout;
+      ptout = &tout;
+    }
+
+    do {
+        PollerHandleDeletionManager.markAllUnusedInThisThread();
+        QPID_LOG(trace, "About to enter port_get. Thread "
+                 << pthread_self()
+                 << ", timeout=" << timeout);
+        
+        int rc = ::port_get(impl->portId, &pe, ptout);
+
+        if (rc < 0) {
+            switch (errno) {
+            case EINTR:
+                continue;
+            case ETIME:
+                return Event(0, TIMEOUT);
+            default:
+                QPID_POSIX_CHECK(rc);
+            }
+        } else {
+            //We use alert mode to notify the shutdown of the Poller
+            if (pe.portev_source == PORT_SOURCE_ALERT) {
+                return Event(0, SHUTDOWN);            
+            }
+            if (pe.portev_source == PORT_SOURCE_FD) {
+                PollerHandle *handle = static_cast<PollerHandle*>(pe.portev_user);
+                PollerHandlePrivate& eh = *handle->impl;
+                ScopedLock<Mutex> l(eh.lock);
+                QPID_LOG(trace, "About to send handle: " << handle);
+                
+                if (eh.isActive()) {
+                  if (pe.portev_events & POLLHUP) {
+                    if (eh.isHungup()) {
+                      return Event(handle, DISCONNECTED);
+                    }
+                    eh.setHungup();
+                  } else {
+                    eh.setInactive();
+                  }
+                  QPID_LOG(trace, "Sending event (thread: "
+                           << pthread_self() << ") for handle " << handle
+                           << ", direction= "
+                           << PollerPrivate::pollToDirection(pe.portev_events));
+                  return Event(handle, PollerPrivate::pollToDirection(pe.portev_events));
+                }
+            }
+        }
+    } while (true);
+}
+
+// Concrete constructors
+Poller::Poller() :
+    impl(new PollerPrivate())
+{}
+
+Poller::~Poller() {
+    delete impl;
+}
+
+}}