You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/06/14 17:55:08 UTC

svn commit: r954525 [2/2] - in /trafficserver/traffic/branches/UserFiber/core: include/ src/

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H Mon Jun 14 15:55:08 2010
@@ -6,41 +6,149 @@
 #include "UF.H"
 #include "UFConnectionPool.H"
 
-class UFIO;
+struct UFIO;
+struct UFConnIPInfo;
+struct UFConnGroupInfo;
+
+typedef std::map<std::string, UFConnGroupInfo*> GroupIPMap;
+typedef std::map<std::string, UFConnIPInfo*>    IPInfoStore;
+typedef std::map<UFIO*, time_t>                 UFIOIntMap;
+typedef std::map<UFIO*, UFConnIPInfo*>          UFConnIPInfoMap;
 
-
-typedef std::map<std::string,UFConnectionGroupInfo*>                                    GroupIPMap;
-typedef std::map<UFIO*, std::pair<UFConnectionIpInfo*, std::pair<bool, time_t> > >      UFConnectionIpInfoMap;
 const time_t DEFAULT_TIMEOUT_OF_IP_ON_FAILURE = 10;
-class UFConnectionPoolImpl
+struct UFConnectionPoolImpl
 {
-public:
     UFConnectionPoolImpl();
     ~UFConnectionPoolImpl();
 
     // Call before thread creation
     static void init();
     
-    bool addGroup(UFConnectionGroupInfo* stGroupInfo);
-    UFConnectionGroupInfo* removeGroup(const std::string& name);
-    UFIO* getConnection(const std::string& groupName);
-    UFIO* getConnection(const std::string& groupName, bool waitForConnection);
+    bool addGroup(UFConnGroupInfo* stGroupInfo);
+    void removeGroup(const std::string& name);
+    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true);
     void releaseConnection(UFIO* ufIO, bool connOk = true);
-    UFConnectionGroupInfo* addGroupImplicit(const std::string& groupName);
+    UFConnGroupInfo* addGroupImplicit(const std::string& groupName);
+
+    void clearUnusedConnections(TIME_IN_US lastUsedTimeDiff = 300000000 /*300 secs*/, unsigned long long int coverListTime = 60*1000*1000);
+
+    void setMaxSimulConnsPerHost(int input);
+    int getMaxSimulConnsPerHost();
+    void setTimeToTimeoutIPAfterFailure(time_t input);
+    time_t getTimeToTimeoutIPAfterFailure();
+
+protected:
+    GroupIPMap                  _groupIpMap;
+    IPInfoStore                 _ipInfoStore;
+    UFConnIPInfoMap             _ufConnIPInfoMap;
+    int                         _maxSimulConnsPerHost;
+    time_t                      _timeToTimeoutIPAfterFailure;  ///How long should we time out an IP if there is a failure
+
+    UFConnIPInfo* getIPInfo(const std::string& name);
+    bool createIPInfo(const std::string& groupName, UFConnGroupInfo* groupInfo);
+};
+inline void UFConnectionPoolImpl::setMaxSimulConnsPerHost(int input) { _maxSimulConnsPerHost = input; }
+inline int UFConnectionPoolImpl::getMaxSimulConnsPerHost() { return _maxSimulConnsPerHost; }
+inline void UFConnectionPoolImpl::setTimeToTimeoutIPAfterFailure(time_t input) { _timeToTimeoutIPAfterFailure = input; }
+inline time_t UFConnectionPoolImpl::getTimeToTimeoutIPAfterFailure() { return _timeToTimeoutIPAfterFailure; }
+
+struct UFConnIPInfo
+{
+    UFConnIPInfo(const std::string& ip, 
+                       unsigned int port,
+                       bool persistent = true, 
+                       int maxSimultaneousConns = 0,
+                       TIME_IN_US connectTimeout = 0,
+                       TIME_IN_US timeToFailOutIPAfterFailureInSecs = 10);
+    ~UFConnIPInfo() {} //TODO: remove all the conns w/in currently available and currently used
+
+    const std::string&  getIP() const;
+    struct sockaddr_in* getSin();
+    unsigned int        getMaxSimultaneousConns() const;
+    unsigned int        getInProcessCount() const;
+    bool                getPersistent() const;
+    unsigned int        getTimeToFailOutIPAfterFailureInSecs() const;
+    unsigned int        getTimeToExpire() const;
+    void                setTimeToExpire(time_t input);
+    TIME_IN_US          getConnectTimeout() const;
+    time_t              getTimedOut() const;
+    void                setTimedOut(time_t t);
+    UFIOIntMap&         getCurrentlyAvailableConnections();
+    UFMutex*            getMutexToCheckSomeConnection();
+
+    time_t              getLastUsed() const;
+    void                incInProcessCount(int numToIncrement = 1);
+    UFIO*               getConnection(UFConnIPInfoMap& _UFConnIPInfoMap);
+    UFIO*               createConnection();
+
+    UFIOIntMap          _currentlyAvailableConnections;
+    unsigned int        _currentlyUsedCount;
+
+protected:
+    std::string         _ip;
+    unsigned int        _port;
+    struct sockaddr_in  _sin;
+    unsigned int        _maxSimultaneousConns;
+    bool                _persistent; 
+
+    unsigned int        _timeToExpireDNSInfo;
+    unsigned int        _timeToFailOutIPAfterFailureInSecs; ///how many s to try before considering the connect a failure
+    TIME_IN_US          _connectTimeout; ///how many ms to try before considering the connect a failure
+    time_t              _timedOut;
+
+    unsigned int        _inProcessCount;
+    UFMutex             _someConnectionAvailable;
+    time_t              _lastUsed;
+
+};
+inline time_t UFConnIPInfo::getLastUsed() const { return _lastUsed; }
+inline struct sockaddr_in* UFConnIPInfo::getSin() { return &_sin; }
+inline const string& UFConnIPInfo::getIP() const { return _ip; }
+inline unsigned int UFConnIPInfo::getMaxSimultaneousConns() const { return _maxSimultaneousConns; }
+inline unsigned int UFConnIPInfo::getInProcessCount() const { return _inProcessCount; }
+inline void UFConnIPInfo::incInProcessCount(int numToIncrement) { _inProcessCount += numToIncrement; }
+inline bool UFConnIPInfo::getPersistent() const { return _persistent; }
+inline unsigned int UFConnIPInfo::getTimeToFailOutIPAfterFailureInSecs() const { return _timeToFailOutIPAfterFailureInSecs; }
+inline TIME_IN_US UFConnIPInfo::getConnectTimeout() const { return _connectTimeout; }
+inline unsigned int UFConnIPInfo::getTimeToExpire() const { return _timeToExpireDNSInfo; }
+inline void UFConnIPInfo::setTimeToExpire(time_t input) { _timeToExpireDNSInfo = input; }
+inline time_t UFConnIPInfo::getTimedOut() const { return _timedOut; }
+inline void UFConnIPInfo::setTimedOut(time_t t) { _timedOut = t; }
+inline UFMutex* UFConnIPInfo::getMutexToCheckSomeConnection() { return &_someConnectionAvailable; }
+inline UFIOIntMap& UFConnIPInfo::getCurrentlyAvailableConnections() { return _currentlyAvailableConnections; }
+
 
-    std::string fillInfo(std::string& data, bool detailed = false) const;
-    double getGroupAvailability(const std::string& name) const;
 
-    void clearBadConnections();
 
+typedef std::vector<std::string>    UFConnIPInfoList;
+struct UFConnGroupInfo
+{
+    UFConnGroupInfo(const std::string& name);
+    ~UFConnGroupInfo();
 
-    static time_t                               _timeoutIP;  ///How long should we time out an IP if there is a failure
+    UFConnIPInfoList&           getIpInfoList();
+    time_t                      getTimeToExpireAt() const;
+    void                        setTimeToExpireAt(time_t input);
+    std::string                 getName() const;
 
 protected:
-    GroupIPMap                                  _groupIpMap;
-    UFConnectionIpInfoMap                       _UFConnectionIpInfoMap;
+    std::string                 _name;
+    UFConnIPInfoList            _ipInfoList;
+    time_t                      _timeToExpireAt;
+};
+inline std::string UFConnGroupInfo::getName() const { return _name; }
+inline time_t UFConnGroupInfo::getTimeToExpireAt() const { return _timeToExpireAt; }
+inline void UFConnGroupInfo::setTimeToExpireAt(time_t input) { _timeToExpireAt = input; }
+inline UFConnIPInfoList& UFConnGroupInfo::getIpInfoList() { return _ipInfoList; }
+
 
-    UFIO* createConnection(UFConnectionIpInfo* stIpInfo);
+struct UFConnectionPoolCleaner : public UF
+{
+    void run();
+    UFConnectionPoolCleaner (bool registerMe = false);
+    UF* createUF() { return new UFConnectionPoolCleaner(); }
 };
+inline UFConnectionPoolCleaner::UFConnectionPoolCleaner (bool registerMe) { /*if(registerMe) _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);*/ }
+
 
 #endif

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFIO.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFIO.C Mon Jun 14 15:55:08 2010
@@ -1,5 +1,7 @@
 #include "UFIO.H"
 #include "UFConnectionPool.H"
+#include "UFStatSystem.H"
+#include "UFStats.H"
 #include <netdb.h>
 #include <sys/socket.h> 
 #include <sys/time.h> 
@@ -39,13 +41,12 @@ void UFIO::reset()
     _lastEpollFlag = 0;
     _sleepInfo = 0;
     _markedActive = false;
+    _active = true;
 }
 
 UFIO::~UFIO()
 {
     close();
-    if(_sleepInfo)
-        _sleepInfo->_ufio = 0;
 }
 
 UFIO::UFIO(UF* uf, int fd)
@@ -275,7 +276,7 @@ void UFIO::accept(UFIOAcceptThreadChoose
                 result = ufiotChooser->pickThread(_fd);
                 ufs = result.first;
                 tToAddTo = result.second;
-                if(!ufs || !ufs->addFibersToScheduler(listOfUFsToAdd, tToAddTo))
+                if(!ufs || !ufs->addFiberToScheduler(listOfUFsToAdd, tToAddTo))
                 {
                     cerr<<"couldnt find thread to assign "<<acceptFd<<" or couldnt add fiber to scheduler"<<endl;
                     exit(1);
@@ -293,7 +294,7 @@ void UFIO::accept(UFIOAcceptThreadChoose
             result = ufiotChooser->pickThread(_fd);
             ufs = result.first;
             tToAddTo = result.second;
-            if(!ufs || !ufs->addFibersToScheduler(listOfUFsToAdd, tToAddTo))
+            if(!ufs || !ufs->addFiberToScheduler(listOfUFsToAdd, tToAddTo))
             {
                 cerr<<"couldnt find thread to assign "<<acceptFd<<" or couldnt add fiber to scheduler"<<endl;
                 exit(1);
@@ -333,8 +334,11 @@ ssize_t UFIO::read(void *buf, size_t nby
     while(1)
     {
         n = ::read(_fd, buf, nbyte);
-        if(n > 0)
+        if(n > 0) 
+        {
+            UFStatSystem::increment(UFStats::bytesRead, n);
             return n;
+        }
         else if(n < 0)
         {
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
@@ -360,6 +364,11 @@ ssize_t UFIO::read(void *buf, size_t nby
         else if(n == 0)
             break;
     }
+
+    // Increment stat for bytes written
+    if(n > 0)
+        UFStatSystem::increment(UFStats::bytesRead, n);
+
     return n;
 }
 
@@ -376,7 +385,10 @@ ssize_t UFIO::write(const void *buf, siz
         {
             amtWritten += n;
             if(amtWritten == nbyte)
+            {
+                UFStatSystem::increment(UFStats::bytesWritten, n);
                 return amtWritten;
+            }
             else
                 continue;
         }
@@ -404,15 +416,23 @@ ssize_t UFIO::write(const void *buf, siz
         else if(n == 0)
             break;
     }
+
+    // Increment stat for bytes written
+    if(n > 0)
+        UFStatSystem::increment(UFStats::bytesWritten, n);
+
     return n;
 }
 
-int UFIO::connect(const struct sockaddr *addr, 
+bool UFIO::connect(const struct sockaddr *addr, 
                int addrlen, 
                TIME_IN_US timeout)
 {
     if(!isSetup()) //create the socket and make the socket non-blocking
-        return -1;
+    {
+        _errno = EINVAL;
+        return false;
+    }
 
 
     //find the scheduler for this request
@@ -420,22 +440,24 @@ int UFIO::connect(const struct sockaddr 
 
     while(::connect(_fd, addr, addrlen) < 0)
     {
-        _errno = errno;
         if(errno == EINTR)
             continue;
         else if(errno == EINPROGRESS || errno == EAGAIN)
         {
             if(!tmpUfios->setupForConnect(this, timeout))
             {
-                cerr<<"couldnt setup for connect - "<<strerror(errno)<<endl;
-                return -1;
+                _errno = errno;
+                return false;
             }
         }
         else
-            return -1;
+        {
+            _errno = errno;
+            return false;
+        }
     }
 
-    return 0;
+    return true;
 }
 
 int UFIO::sendto(const char *buf, int len, const struct sockaddr *to, int tolen, TIME_IN_US timeout)
@@ -994,8 +1016,10 @@ void EpollUFIOScheduler::waitForEvents(T
                         exit(1);
                     }
                     ufio->_markedActive = true;
-                    //activate the fiber
-                    ufs->addFiberToScheduler(uf, 0);
+                    if(ufio->_active)
+                        //activate the fiber
+                        ufs->addFiberToScheduler(uf, 0);
+                    //else must be the case that no one is watching this ufio - such as in the case where the conn. pool holding onto the conn.
                 }
                 else
                 {
@@ -1057,7 +1081,7 @@ void EpollUFIOScheduler::waitForEvents(T
                     }
                     ++beg;
                 }
-                ufs->addFibersToScheduler(ufsToAddToScheduler, 0);
+                ufs->addFiberToScheduler(ufsToAddToScheduler, 0);
             }
         }
 

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFPC.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFPC.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFPC.C Mon Jun 14 15:55:08 2010
@@ -3,10 +3,11 @@
 
 using namespace std;
 
-bool UFProducer::produceData(void* data, size_t size, bool freeDataOnExit, UF* uf)
+bool UFProducer::produceData(void* data, unsigned int size, int ufpcCode, bool freeDataOnExit, UF* uf)
 {
     if(!uf)
         uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    _uf = uf;
 
     //create the UFProducerData structure
     UFProducerData* ufpd = UFProducerData::getObj();
@@ -14,50 +15,75 @@ bool UFProducer::produceData(void* data,
     ufpd->_size = size;
     ufpd->_freeDataOnExit = freeDataOnExit;
     ufpd->_producerWhichInserted = this;
-    ufpd->_ufpcCode = (data && size) ? ADD : END;
+    ufpd->_ufpcCode = ufpcCode;
+    ufpd->_lockToUpdate = _requireLockToUpdateConsumers;
 
+    //increase the reference count
+    ufpd->addRef(_producersConsumerSetSize);
 
-    /*TODO: add the structure to the queue 
-    _producerDataMutex.lock(uf);
-    _producerData.push_back(ufpd);
-    _producerDataMutex.unlock(uf);
-    */
+    //the optimized case of no locking + there being only one consumer
+    if(!_requireLockToUpdateConsumers &&
+       _producersConsumerSetSize == 1 &&
+       _mostRecentConsumerAdded)
+    {
+        _mostRecentConsumerAdded->_queueOfDataToConsume.push_back(ufpd);
+        UF* consUF = _mostRecentConsumerAdded->getUF();
+        if(consUF)
+            UFScheduler::getUFScheduler()->addFiberToScheduler(consUF);
+        return true;
+    }
 
     //for each of the consumers add it to their queue
-    _producersConsumerSetLock.getSpinLock();
-    //increase the reference count
-    ufpd->addRef(_producersConsumerSet.size());
+    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
     for(set<UFConsumer*>::iterator beg = _producersConsumerSet.begin();
         beg != _producersConsumerSet.end(); ++beg)
     {
         //add to the consumer's queue
-        (*beg)->_queueOfDataToConsumeLock.lock(uf);
-        (*beg)->_queueOfDataToConsume.push_back(ufpd);
-        (*beg)->_queueOfDataToConsumeLock.signal();
-        (*beg)->_queueOfDataToConsumeLock.unlock(uf);
+        if(_requireLockToUpdateConsumers)
+        {
+            (*beg)->_queueOfDataToConsumeLock.lock(uf);
+            (*beg)->_queueOfDataToConsume.push_back(ufpd);
+            if(!(*beg)->getNotifyOnExitOnly() || (ufpcCode == 0))
+                (*beg)->_queueOfDataToConsumeLock.signal();
+            (*beg)->_queueOfDataToConsumeLock.unlock(uf);
+        }
+        else
+        {
+            (*beg)->_queueOfDataToConsume.push_back(ufpd);
+            UF* consUF = (*beg)->getUF();
+            if(consUF && (!(*beg)->getNotifyOnExitOnly() || (ufpcCode == 0)))
+                UFScheduler::getUFScheduler()->addFiberToScheduler(consUF);
+        }
     }
-    //printf("%lu added to %lu\n", (unsigned long int) ((uintptr_t)(void*)uf), (unsigned long int)_producersConsumerSet.size());
-    _producersConsumerSetLock.releaseSpinLock();
+    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
 
     return true;
 }
 
-UFConsumer::UFConsumer(bool shouldLockForInternalMods) : _shouldLockForInternalMods(shouldLockForInternalMods) { }
+UFConsumer::UFConsumer(bool notifyOnExitOnly) : _notifyOnExitOnly(notifyOnExitOnly)
+{ 
+    _currUF = 0; 
+    _requireLockToWaitForUpdate = true;
+}
 
 UFProducerData* UFConsumer::waitForData(UF* uf)
 {
     if(!uf)
         uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
 
-    _queueOfDataToConsumeLock.lock(uf);
+    _currUF = uf;
+
+    if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.lock(uf);
     while(_queueOfDataToConsume.empty())
-        _queueOfDataToConsumeLock.condWait(uf); //TODO: change to condTimedWait
+    {
+        if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.condWait(uf); //TODO: change to condTimedWait
+        else uf->block(); //wait for the producer to wake up the consumer
+    }
 
     //read the first element from the queue
     UFProducerData* result = _queueOfDataToConsume.front();
     _queueOfDataToConsume.pop_front();
-    //printf("%lu consumer read 1 out %lu\n", (unsigned long int) ((uintptr_t)(void*)uf), (unsigned long int)_queueOfDataToConsume.size());
-    _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
+    if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
 
     return result;
 }
@@ -68,20 +94,18 @@ bool UFConsumer::hasData(UF* uf)
         uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
 
     bool result;
-    _queueOfDataToConsumeLock.lock(uf);
+    if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.lock(uf);
     result = !(_queueOfDataToConsume.empty());
-    _queueOfDataToConsumeLock.unlock(uf);
+    if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
 
     return result;
 }
 
 bool UFConsumer::joinProducer(UFProducer* ufp)
 {
-    if(_shouldLockForInternalMods)
-        _consumersProducerSetLock.getSpinLock();
+    if(_requireLockToWaitForUpdate) _consumersProducerSetLock.getSpinLock();
     _consumersProducerSet.insert(ufp);
-    if(_shouldLockForInternalMods)
-        _consumersProducerSetLock.releaseSpinLock();
+    if(_requireLockToWaitForUpdate) _consumersProducerSetLock.releaseSpinLock();
 
     //notify the producer that we're adding this consumer
     if(!ufp->addConsumer(this))
@@ -96,21 +120,20 @@ bool UFConsumer::removeProducer(UFProduc
     if(!ufp->removeConsumer(this))
         return false;
 
-    if(_shouldLockForInternalMods)
-        _consumersProducerSetLock.getSpinLock();
+    if(_requireLockToWaitForUpdate) _consumersProducerSetLock.getSpinLock();
     _consumersProducerSet.erase(ufp);
-    if(_shouldLockForInternalMods)
-        _consumersProducerSetLock.releaseSpinLock();
+    if(_requireLockToWaitForUpdate) _consumersProducerSetLock.releaseSpinLock();
     return true;
 }
 
-UFConsumer::~UFConsumer()
+void UFConsumer::reset()
 {
     //1. notify all the producers on exit
-    for(set<UFProducer*>::iterator beg = _consumersProducerSet.begin();
-        beg != _consumersProducerSet.end();
-        ++beg)
+    for(set<UFProducer*>::iterator beg = _consumersProducerSet.begin(); beg != _consumersProducerSet.end(); )
+    {
         removeProducer(*beg);
+        beg = _consumersProducerSet.begin();
+    }
 
     //2. clear out all the remaining entries in the queue
     if(!_queueOfDataToConsume.empty())
@@ -119,25 +142,28 @@ UFConsumer::~UFConsumer()
         for(; beg != _queueOfDataToConsume.end(); ++beg)
             UFProducerData::releaseObj((*beg));
     }
-} 
-
-UFProducer::UFProducer()
-{
 }
 
-UFProducer::~UFProducer()
+void UFProducer::reset()
 {
-    produceData(0, 0, 0); //notify the consumers that the producer is bailing
+    //add the EOF indicator
+    produceData(0, 0, 0/*exit*/, 0/*freeDataOnExit*/); //notify the consumers that the producer is bailing
 
     //have to wait for all the consumers to acknowledge my death
-    while(1)
+    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
+    _acceptNewConsumers = false;
+    while(!_producersConsumerSet.empty())
     {
-        //TODO: change to signal based later
-        _producersConsumerSetLock.getSpinLock();
-        if(!_producersConsumerSet.size())
-            break;
-        _producersConsumerSetLock.releaseSpinLock();
-        usleep(100000);
+        if(_requireLockToUpdateConsumers) _producersConsumerSetLock.condTimedWait(uf, 1000000);
+        else 
+        {
+            _uf = uf;
+            _uf->block();
+        }
     }
+    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
 }
 
+stack<UFProducerData*> UFProducerData::_objList;
+UFMutex UFProducerData::_objListMutex;

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFServer.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFServer.C Mon Jun 14 15:55:08 2010
@@ -7,12 +7,27 @@
 #include <sys/types.h>
 #include <sys/wait.h>
 #include "UFStatSystem.H"
+#include "UFStats.H"
 
 using namespace std;
 
 //TODO: handle signals later
 //TODO: create monitoring port later
 //
+//
+static string getPrintableTime()
+{
+    char asctimeDate[32];
+    asctimeDate[0] = '\0';
+    time_t now = time(0);
+    asctime_r(localtime(&now), asctimeDate);
+
+    string response = asctimeDate;
+    size_t loc = response.find('\n');
+    if(loc != string::npos)
+        response.replace(loc, 1, "");
+    return response;
+}
 
 void UFServer::reset()
 {
@@ -43,6 +58,8 @@ struct NewConnUF : public UF
 
         UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
         ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
+        // increment connections handled stat
+        UFStatSystem::increment(UFStats::connectionsHandled);
 
         //clear the client connection
         delete fiberStartingArgs->ufio;
@@ -77,14 +94,15 @@ struct AcceptRunner : public UF
             fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), ufserver->getPort() /*, deal w/ backlog*/);
         if(fd < 0)
         {
-            cerr<<"couldnt setup listen socket"<<endl;
+            cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup listen socket"<<endl;
             exit(1);
         }
         if(!ufio || !ufio->setFd(fd, false/*has already been made non-blocking*/))
         {
-            cerr<<"couldnt setup accept thread"<<endl;
+            cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup accept thread"<<endl;
             return;
         }
+
         ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, ufserver, 0, 0);
     }
     AcceptRunner(bool registerMe = false)
@@ -112,12 +130,13 @@ void UFServer::startThreads()
     for(; i<MAX_THREADS_ALLOWED; i++)
     {
         UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>());
+        cerr<<getPrintableTime()<<" "<<getpid()<<": created thread (with I/O) - "<<thread[i]<<endl;
         usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to cond. var later
         //add the io threads to the thread chooser
         UFScheduler* ufs = UFScheduler::getUFScheduler(thread[i]);
         if(!ufs)
         {
-            cerr<<"didnt get scheduler for tid = "<<thread[i]<<endl;
+            cerr<<getPrintableTime()<<" "<<getpid()<<": didnt get scheduler for tid - "<<thread[i]<<endl;
             exit(1);
         }
         addThread("NETIO", ufs, thread[i]);
@@ -126,6 +145,9 @@ void UFServer::startThreads()
     //start the stats thread
     UFStatSystem::init(this);
 
+    // Register server stats
+    UFStats::registerStats();
+
     preAccept();
     //start the accept thread
     for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
@@ -135,13 +157,12 @@ void UFServer::startThreads()
         list<UF*>* ufsToAdd = new list<UF*>();
         ufsToAdd->push_back(ar);
         UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
-
         usleep(5000); //TODO: let the thread finish initializing 
         addThread("ACCEPT", 0, thread[i]);
+        cerr<<getPrintableTime()<<" "<<getpid()<<": created accept thread (with I/O) - "<<thread[i]<<endl;
     }
 
 
-
     //wait for the threads to finish
     void* status;
     for(i=0; i<MAX_THREADS_ALLOWED+MAX_ACCEPT_THREADS_ALLOWED; i++)
@@ -161,11 +182,11 @@ void UFServer::run()
     _listenFd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), _port); //TODO:set the backlog
     if(_listenFd < 0)
     {
-        cerr<<"couldnt setup listen socket"<<endl;
+        cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl;
         exit(1);
     }
 
-    if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes
+    if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes (or to only run in threaded mode)
     {
         preThreadRun();
         startThreads();
@@ -181,7 +202,7 @@ void UFServer::run()
             unsigned int pid = fork();
             if(pid < 0)
             {
-                cerr<<"("<<getpid()<<")(P): couldnt create child# : "<<strerror(errno)<<endl;
+                cerr<<getPrintableTime()<<" "<<getpid()<<": (P): couldnt create child# : "<<strerror(errno)<<endl;
                 exit(1);
             }
             if(!pid) //child listens to conns
@@ -195,6 +216,7 @@ void UFServer::run()
                 startThreads();
                 exit(0);
             }
+            cerr<<getPrintableTime()<<" "<<getpid()<<": (P): started child process: "<<pid<<endl;
             _childProcesses[pid] = time(0);
             postBetweenFork(pid);
         }
@@ -206,19 +228,18 @@ void UFServer::run()
         else if(child_pid < 0)
         {
             if(errno != ECHILD)
-                cerr<<"("<<getpid()<<")(P): waitpid error: "<<strerror(errno)<<endl;
+                cerr<<getPrintableTime()<<" "<<getpid()<<": (P): waitpid error:"<<endl;
         }
         else if(child_pid > 0)
         {
-            cerr<<"("<<getpid()<<")(P): child_pid "<<child_pid<<" died "<<endl;
+            cerr<<getPrintableTime()<<" "<<getpid()<<")(P): child_pid "<<child_pid<<" died "<<endl;
             map<int, time_t>::iterator itr = _childProcesses.find(child_pid);
             if(itr != _childProcesses.end()) 
                 _childProcesses.erase(itr);
-            //parentChildDeathHandler(child_pid); TODO
         }
 
         //we've been asked to bail
-        if(MAX_PROCESSES_ALLOWED && !_childProcesses.size())
+        if(!MAX_PROCESSES_ALLOWED && !_childProcesses.size())
             break;
 
         //let the parent rest
@@ -241,7 +262,7 @@ void UFServer::addThread(const std::stri
     }
     index->second->push_back(tid);
 
-    if(ufScheduler)
+    if(ufScheduler && type == "NETIO")
         _threadChooser->add(ufScheduler, tid);
 }
 

Added: trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStats.C?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStats.C (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStats.C Mon Jun 14 15:55:08 2010
@@ -0,0 +1,22 @@
+#include "UFStats.H"
+#include "UFStatSystem.H"
+
+uint32_t UFStats::connectionsHandled;
+uint32_t UFStats::txnSuccess;
+uint32_t UFStats::txnFail;
+uint32_t UFStats::txnReject;
+uint32_t UFStats::bytesRead;
+uint32_t UFStats::bytesWritten;
+
+namespace UFStats
+{
+    void registerStats(bool lock_needed)
+    {
+        UFStatSystem::registerStat("connections.handled", &connectionsHandled, lock_needed);
+        UFStatSystem::registerStat("txn.success", &txnSuccess, lock_needed);
+        UFStatSystem::registerStat("txn.fail", &txnFail, lock_needed);
+        UFStatSystem::registerStat("txn.reject", &txnReject, lock_needed);
+        UFStatSystem::registerStat("bytes.read", &bytesRead, lock_needed);
+        UFStatSystem::registerStat("bytes.written", &bytesWritten, lock_needed);
+    }
+}

Added: trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H Mon Jun 14 15:55:08 2010
@@ -0,0 +1,31 @@
+/** @file
+
+  A brief file description
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#ifndef _UFSWAPCONTEXT_H_
+#define _UFSWAPCONTEXT_H_
+
+extern "C" {
+  int uf_swapcontext(ucontext *old_ctx, ucontext *new_ctx);
+}
+
+#endif

Added: trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S Mon Jun 14 15:55:08 2010
@@ -0,0 +1,59 @@
+#if __WORDSIZE == 64
+.text
+	.align 4
+.globl uf_swapcontext
+	.type	 uf_swapcontext,@function
+
+uf_swapcontext:
+        // store
+	movq    %r15, 0x60(%rdi)
+	movq    %r14, 0x58(%rdi)
+	movq    %r13, 0x50(%rdi)
+	movq    %r12, 0x48(%rdi)
+	movq    %r9, 0x30(%rdi)
+	movq    %r8, 0x28(%rdi)
+	movq    %rdi, 0x68(%rdi)
+	movq    %rsi, 0x70(%rdi)
+        movq    %rbp, 0x78(%rdi)
+        movq    %rbx, 0x80(%rdi)
+	movq    %rdx, 0x88(%rdi)
+	movq    %rcx, 0x98(%rdi)
+        
+	movq    (%rsp), %rcx
+	movq    %rcx, 0xa8(%rdi)
+	leaq    0x8(%rsp), %rcx
+	movq    %rcx, 0xa0(%rdi)
+        
+	leaq    0x1a8(%rdi), %rcx
+	movq    %rcx, 0xe0(%rdi)
+        
+	fnstenv (%rcx)
+	stmxcsr 0x1c0(%rdi)
+        
+        // restore
+        movq    %rsi, %rdi
+        movq    0xe0(%rdi), %rcx
+        fldenv  (%rcx)
+        ldmxcsr 0x1c0(%rdi)
+
+        movq    0x60(%rdi), %r15
+        movq    0x58(%rdi), %r14
+        movq    0x50(%rdi), %r13
+        movq    0x48(%rdi), %r12
+        movq    0x30(%rdi), %r9
+        movq    0x28(%rdi), %r8
+        movq    0x70(%rdi), %rsi
+        movq    0x78(%rdi), %rbp
+        movq    0x80(%rdi), %rbx
+        movq    0x88(%rdi), %rdx
+        movq    0x98(%rdi), %rcx
+        
+        movq    0xa0(%rdi), %rsp
+        movq    0xa8(%rdi), %rcx
+        pushq   %rcx
+        
+        movq    0x68(%rdi), %rdi
+        
+        xorl    %eax, %eax
+        ret
+#endif

Propchange: trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S
------------------------------------------------------------------------------
    svn:executable = *