You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/16 04:10:55 UTC

svn commit: r944754 - in /trafficserver/traffic/branches/UserFiber: include/UF.H include/UFConnectionPool.H include/UFIO.H include/UFServer.H src/UF.C src/UFConnectionPoolImpl.C src/UFConnectionPoolImpl.H src/UFIO.C src/UFServer.C

Author: akundu
Date: Sun May 16 02:10:55 2010
New Revision: 944754

URL: http://svn.apache.org/viewvc?rev=944754&view=rev
Log:
- Raghav's completed working version of UFConnectionPool
- UFServer now users ufCreateThreadWithIO (instead of creating its own accept fxn)
- cleanups


Modified:
    trafficserver/traffic/branches/UserFiber/include/UF.H
    trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
    trafficserver/traffic/branches/UserFiber/include/UFIO.H
    trafficserver/traffic/branches/UserFiber/include/UFServer.H
    trafficserver/traffic/branches/UserFiber/src/UF.C
    trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C
    trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H
    trafficserver/traffic/branches/UserFiber/src/UFIO.C
    trafficserver/traffic/branches/UserFiber/src/UFServer.C

Modified: trafficserver/traffic/branches/UserFiber/include/UF.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UF.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UF.H Sun May 16 02:10:55 2010
@@ -157,7 +157,7 @@ struct UFScheduler
 
 
     static ThreadUFSchedulerMap  _threadUFSchedulerMap;
-    static pthread_mutex_t       _mutexToCheckFiberScheduerMap;
+    static pthread_mutex_t       _mutexToCheckFiberSchedulerMap;
             
     //returns the fiber scheduler on this thread or other threads;
     static UFScheduler*          getUFScheduler(pthread_t tid = 0); 
@@ -202,6 +202,7 @@ protected:
 
     //the sleep tree
     MapTimeUF                   _sleepList;
+    unsigned long long int      _earliestWakeUpFromSleep;
     //store the shortest sleep interval
     unsigned long long int      _amtToSleep;
 
@@ -248,8 +249,12 @@ inline void UF::usleep(unsigned long lon
 
     struct timeval now;
     gettimeofday(&now, 0);
-    unsigned long long int timeNow = now.tv_sec*1000000+now.tv_usec;
-    _parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), this));
+    unsigned long long int timeToWakeUp = now.tv_sec*1000000+now.tv_usec + sleepAmtInUs;
+    if( _parentScheduler->_earliestWakeUpFromSleep > timeToWakeUp ||
+       !_parentScheduler->_earliestWakeUpFromSleep)
+        _parentScheduler->_earliestWakeUpFromSleep = timeToWakeUp;
+
+    _parentScheduler->_sleepList.insert(std::make_pair(timeToWakeUp, this));
     block();
 }
 

Modified: trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H Sun May 16 02:10:55 2010
@@ -38,6 +38,9 @@ public:
     double getGroupAvailability(const std::string& name) const;
     void clearBadConnections();
 
+    static int MAX_SIMUL_CONNS_PER_HOST;
+    static int TIMEOUT_PER_REQUEST;
+    
     //TODO - get stats & detailed stats info
 protected:
 
@@ -48,14 +51,13 @@ protected:
 struct UFConnectionPoolCleaner : public UF
 {
     void run();
-    UFConnectionPoolCleaner (bool registerMe = false) : _conn_pool(NULL)
+    UFConnectionPoolCleaner (bool registerMe = false)
     {
 //        if(registerMe)
 //            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
     }
 
     UF* createUF() { return new UFConnectionPoolCleaner; }
-    UFConnectionPool* _conn_pool;
 };
 
 namespace StringUtil
@@ -83,6 +85,7 @@ struct UFConnectionIpInfo
     UFIOIntMap          _currentlyAvailableConnections;
     UFIOIntMap          _currentlyUsedConnections;
     unsigned int        _inProcessCount;
+    UFMutex _someConnectionAvailable;
 };
 
 

Modified: trafficserver/traffic/branches/UserFiber/include/UFIO.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFIO.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFIO.H Sun May 16 02:10:55 2010
@@ -187,15 +187,16 @@ struct EpollUFIOScheduler : public UFIOS
     bool                            _interruptedByEventFd;
 
 protected:
-    UF*                 _uf;
-    int                 _epollFd;
-    unsigned int        _maxFds;
-    struct epoll_event* _epollEventStruct;
-    IntUFIOMap          _intUFIOMap;
-    bool                _alreadySetup;
+    UF*                             _uf;
+    int                             _epollFd;
+    unsigned int                    _maxFds;
+    struct epoll_event*             _epollEventStruct;
+    IntUFIOMap                      _intUFIOMap;
+    bool                            _alreadySetup;
 
 
-    MapTimeUFIO         _sleepList;
+    MapTimeUFIO                     _sleepList;
+    unsigned long long int          _earliestWakeUpFromSleep;
 
     bool addToScheduler(UFIO* ufio, 
                         void* inputInfo /*flags to identify how ot add*/, 

Modified: trafficserver/traffic/branches/UserFiber/include/UFServer.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFServer.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFServer.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFServer.H Sun May 16 02:10:55 2010
@@ -50,7 +50,7 @@ public:
 
     StringThreadMapping* getThreadList() { return &_threadList; };
     std::vector<pthread_t>* getThreadType(const std::string& type);
-    void addThread(const std::string& type, UFScheduler* ufScheduler);
+    void addThread(const std::string& type, UFScheduler* ufScheduler, pthread_t tid=0);
     void run();
 
 protected:
@@ -76,20 +76,24 @@ private:
     //start processing
 };
 
+
+
 struct UFServerThreadChooser : public UFIOAcceptThreadChooser
 {
     UFServerThreadChooser() { }
 
     std::pair<UFScheduler*, pthread_t> pickThread(int listeningFd);
-    void add(UFScheduler* ufs, pthread_t tid)
-    {
-        _threadList.push_back(make_pair(ufs, tid));
-    }
+    void add(UFScheduler* ufs, pthread_t tid);
 
 protected:
     std::vector<pair<UFScheduler*, pthread_t> > _threadList;
 };
 
+inline void UFServerThreadChooser::add(UFScheduler* ufs, pthread_t tid)
+{
+    _threadList.push_back(make_pair(ufs, tid));
+}
+
 inline std::pair<UFScheduler*, pthread_t> UFServerThreadChooser::pickThread(int listeningFd)
 {
     static unsigned int lastLocUsed = 0;

Modified: trafficserver/traffic/branches/UserFiber/src/UF.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UF.C?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UF.C Sun May 16 02:10:55 2010
@@ -96,7 +96,7 @@ bool UF::setup(void* stackPtr, size_t st
 
 ///////////////UFScheduler/////////////////////
 ThreadUFSchedulerMap UFScheduler::_threadUFSchedulerMap;
-pthread_mutex_t UFScheduler::_mutexToCheckFiberScheduerMap = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t UFScheduler::_mutexToCheckFiberSchedulerMap = PTHREAD_MUTEX_INITIALIZER;
 static pthread_key_t getThreadKey()
 {
     if(pthread_key_create(&UFScheduler::_specific_key, 0) != 0)
@@ -109,6 +109,7 @@ static pthread_key_t getThreadKey()
 pthread_key_t UFScheduler::_specific_key = getThreadKey();
 UFScheduler::UFScheduler()
 {
+    _earliestWakeUpFromSleep = 0;
     _exitJustMe = false;
     _specific = 0;
     _currentFiber = 0;
@@ -126,14 +127,14 @@ UFScheduler::UFScheduler()
     {
         pthread_t currThreadId = pthread_self();
 
-        pthread_mutex_lock(&_mutexToCheckFiberScheduerMap);
+        pthread_mutex_lock(&_mutexToCheckFiberSchedulerMap);
         if(_threadUFSchedulerMap.find(currThreadId) != _threadUFSchedulerMap.end())
         {
             cerr<<"cannot have more than one scheduler per thread"<<endl;
             exit(1);
         }
         _threadUFSchedulerMap[currThreadId] = this;
-        pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+        pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
     }
     else
     {
@@ -157,7 +158,7 @@ UFScheduler::UFScheduler()
 
 UFScheduler::~UFScheduler()
 {
-    pthread_key_delete(_specific_key);
+    //pthread_key_delete(_specific_key);
     delete _conn_pool;
 }
 
@@ -262,11 +263,6 @@ void UFScheduler::runScheduler()
     gettimeofday(&start, 0);
     unsigned long long int timeNow = 0;
 
-    // Add connection pool cleanup fiber
-    UFConnectionPoolCleaner *conn_pool_cleanup_fiber = new UFConnectionPoolCleaner;
-    conn_pool_cleanup_fiber->_conn_pool = _conn_pool;
-    addFiberToScheduler(conn_pool_cleanup_fiber);
-    
     while(!_exitJustMe && !_exit)
     {
         UFList::iterator beg = _activeRunningList.begin();
@@ -299,33 +295,6 @@ void UFScheduler::runScheduler()
         //check the sleep queue
         ranGetTimeOfDay = false;
         _amtToSleep = DEFAULT_SLEEP_IN_USEC;
-        //pick up the fibers that may have completed sleeping
-        //look into the sleep list;
-        if(!_sleepList.empty())
-        {
-            gettimeofday(&now, 0);
-            ranGetTimeOfDay = true;
-            timeNow = (now.tv_sec*1000000)+now.tv_usec;
-            for(MapTimeUF::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
-            {
-                //TODO: has to be cleaned up
-                //1. see if anyone has crossed the sleep timer - add them to the active list
-                if(beg->first <= timeNow) //sleep time is over
-                {
-                    _activeRunningList.push_front(beg->second); //putting to the front - so that it gets evaluated before anything else
-                    _sleepList.erase(beg);
-                    beg = _sleepList.begin();
-                    continue;
-                }
-                else
-                {
-                    _amtToSleep = beg->first-timeNow;
-                    break;
-                }
-                ++beg;
-            }
-        }
-
 
         //check if some other thread has nominated some user fiber to be
         //added to this thread's list -
@@ -336,7 +305,7 @@ void UFScheduler::runScheduler()
            _inThreadedMode)
 
         {
-            _amtToSleep = 0;
+            _amtToSleep = 0; //since we're adding new ufs to the list we dont need to sleep
             //TODO: do atomic comparison to see if there is anything in 
             //_nominateToAddToActiveRunningList before getting the lock
             pthread_mutex_lock(&_mutexToNominateToActiveList);
@@ -345,7 +314,7 @@ void UFScheduler::runScheduler()
             {
                 UF* uf = *beg;
                 if(uf->_parentScheduler)
-                    _activeRunningList.push_front(uf);
+                    _activeRunningList.push_back(uf);
                 else //adding a new fiber
                     addFiberToScheduler(uf, 0);
                 beg = _nominateToAddToActiveRunningList.erase(beg);
@@ -355,6 +324,38 @@ void UFScheduler::runScheduler()
         }
 
 
+        //pick up the fibers that may have completed sleeping
+        //look into the sleep list;
+        if(!_sleepList.empty())
+        {
+            gettimeofday(&now, 0);
+            ranGetTimeOfDay = true;
+            timeNow = (now.tv_sec*1000000)+now.tv_usec;
+            if(timeNow >= _earliestWakeUpFromSleep) //dont go into this queue unless the time seen the last time has passed
+            {
+                for(MapTimeUF::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
+                {
+                    //TODO: has to be cleaned up
+                    //1. see if anyone has crossed the sleep timer - add them to the active list
+                    if(beg->first <= timeNow) //sleep time is over
+                    {
+                        _activeRunningList.push_back(beg->second);
+                        _sleepList.erase(beg);
+                        beg = _sleepList.begin();
+                        continue;
+                    }
+                    else
+                    {
+                        if(_amtToSleep) //since the nominate system might have turned off the sleep - we dont activate it again
+                            _amtToSleep = beg->first-timeNow; 
+                        _earliestWakeUpFromSleep = beg->first;
+                        break;
+                    }
+                    ++beg;
+                }
+            }
+        }
+
         //see if there is anything to do or is it just sleeping time now
         if(!_notifyFunc && _activeRunningList.empty() && !_exit)
         {
@@ -390,14 +391,14 @@ UFScheduler* UFScheduler::getUFScheduler
     if(!tid || tid == pthread_self())
         return (UFScheduler*)pthread_getspecific(_specific_key);
 
-    pthread_mutex_lock(&_mutexToCheckFiberScheduerMap);
+    pthread_mutex_lock(&_mutexToCheckFiberSchedulerMap);
     ThreadUFSchedulerMap::const_iterator index = _threadUFSchedulerMap.find(tid);
     if(index == _threadUFSchedulerMap.end())
     {
-        pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+        pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
         return 0;
     }
-    pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+    pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
 
     return const_cast<UFScheduler*>(index->second);
 }
@@ -442,10 +443,7 @@ void* setupThread(void* args)
 
     list<UF*>* ufsToStartWith = (list<UF*>*) args;
     UFScheduler ufs;
-    for(list<UF*>::iterator beg = ufsToStartWith->begin();
-        beg != ufsToStartWith->end();
-        ++beg)
-        ufs.addFiberToScheduler(*beg);
+    ufs.addFibersToScheduler(*ufsToStartWith, 0);
     delete ufsToStartWith;
 
     //run the scheduler

Modified: trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C Sun May 16 02:10:55 2010
@@ -197,8 +197,6 @@ UFIO* UFConnectionPoolImpl::getConnectio
     return getConnection(groupName, false);
 }
 
-static int MAX_SIMUL_CONNS_PER_HOST = 5;
-static int TIMEOUT_PER_REQUEST = 10;
 UFConnectionGroupInfo* UFConnectionPoolImpl::addGroupImplicit(const std::string& groupName)
 {
     UFConnectionGroupInfo* group = new UFConnectionGroupInfo(groupName);
@@ -210,8 +208,8 @@ UFConnectionGroupInfo* UFConnectionPoolI
     
     UFConnectionIpInfo* ip = new UFConnectionIpInfo(groupName,
                                                     true, 
-                                                    MAX_SIMUL_CONNS_PER_HOST,
-                                                    TIMEOUT_PER_REQUEST);
+                                                    UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST,
+                                                    UFConnectionPool::TIMEOUT_PER_REQUEST);
     if(!ip)
     {
         cerr<<getpid()<<" "<<time(NULL)<<" couldnt create the ip obj"<<endl;
@@ -234,7 +232,7 @@ UFIO* UFConnectionPoolImpl::getConnectio
     {
         cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"null group or didnt find group with name "<<groupName<<endl;
         groupInfo = addGroupImplicit(groupName);
-        if(!groupInfo)
+        if(!groupInfo) 
             return NULL;
     }
     else
@@ -242,6 +240,10 @@ UFIO* UFConnectionPoolImpl::getConnectio
         groupInfo = (*foundItr).second;
     }
 
+    UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
+
+
 
     UFIO* returnConn = NULL;
     map<unsigned int,unsigned int> alreadySeenIPList; //this list will keep track of the ips that we've already seen
@@ -283,94 +285,67 @@ UFIO* UFConnectionPoolImpl::getConnectio
             continue;
         ipInfo->_timedOut = 0;
 
-        //3. pick a connection from the currently available conns
-        //   (if there are any available)
-        UFIOIntMap::iterator beg = ipInfo->_currentlyAvailableConnections.begin();
-        for(; beg != ipInfo->_currentlyAvailableConnections.end(); 
-              beg = ipInfo->_currentlyAvailableConnections.begin()  // we're resetting to the beginning to avoid
-                                                                    // the case of two threads ending up getting 
-                                                                    // the same connection
-           )
-        {
-            returnConn = NULL;
-            if(!((*beg).first))
-            {
-                ipInfo->_currentlyAvailableConnections.erase(beg);
-                cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
-                continue;
-            }
-            returnConn = (*beg).first;
-            //take the found connection away from the curentlyAvaliableConnections list
-            //since validConnection now actually checks to see the content thats within the channel to verify the validity of the connection
-            //it may be that the thread gets switched out and some other thread comes into this section
-            ipInfo->_currentlyAvailableConnections.erase(beg);
-            ipInfo->_currentlyUsedConnections[returnConn] = time(NULL);
-            break;
-        }
 
-        //4. if no connections are available then create a new connection if allowed
-        if(!returnConn)
-        {
-            bool getConnection = false;
-
-            if ((ipInfo->_maxSimultaneousConns < 0) || 
-                (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount < (unsigned int) ipInfo->_maxSimultaneousConns)
-               )
-                getConnection = true;
-            else if ( waitForConnection && 
-                      (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns)
+        while(1) {
+            if(ipInfo->_currentlyAvailableConnections.size()) {
+                //3. pick a connection from the currently available conns
+
+                UFIOIntMap::iterator beg = ipInfo->_currentlyAvailableConnections.begin();
+                for(; beg != ipInfo->_currentlyAvailableConnections.end(); 
+                    beg = ipInfo->_currentlyAvailableConnections.begin()  // we're resetting to the beginning to avoid
+                        // the case of two threads ending up getting 
+                        // the same connection
                     )
-            {
-                //wait for the signal to get pinged
-                //unsigned short int counter = 0;
-                //while(counter < MAX_WAIT_FOR_CONNECTION_TO_BE_AVAILABLE) //we only try for 10 times
-                while(true)
                 {
-                    if (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount
-                            >= (unsigned int) ipInfo->_maxSimultaneousConns)
+                    returnConn = NULL;
+                    if(!((*beg).first))
                     {
-                        UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
-                        UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
-                        _someConnectionAvailable.lock(this_user_fiber);
-                        _someConnectionAvailable.condWait(this_user_fiber);
-                        _someConnectionAvailable.unlock(this_user_fiber);
-                    }
-                    else
-                    {
-                        getConnection = true; //this is here just so that if we ever go to the path of having a 
-                                              //max allowed waiting time - this var. will only get set if the 
-                                              //condition is met
-                        break;
+                        ipInfo->_currentlyAvailableConnections.erase(beg);
+                        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
+                        continue;
                     }
+                    returnConn = (*beg).first;
+                    //take the found connection away from the curentlyAvaliableConnections list
+                    //since validConnection now actually checks to see the content thats within the channel to verify the validity of the connection
+                    //it may be that the thread gets switched out and some other thread comes into this section
+                    ipInfo->_currentlyAvailableConnections.erase(beg);
+                    ipInfo->_currentlyUsedConnections[returnConn] = time(NULL);
+                    return returnConn;
                 }
             }
-
-            if(getConnection)
-            {
-                ipInfo->_inProcessCount++;
-                returnConn = createConnection(ipInfo);
-                ipInfo->_inProcessCount--;
-                if(returnConn)
-                {
-                    time_t currTime = time(NULL);
-                    ipInfo->_currentlyUsedConnections[returnConn] = currTime;
-                    _UFConnectionIpInfoMap[returnConn] = make_pair(ipInfo, make_pair(true, currTime));
+            else {
+                // if _maxSimultaneousConns is hit, wait for a connection to become available
+                if(ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns) {
+                    // wait for a connection to be released
+                    ipInfo->_someConnectionAvailable.lock(this_user_fiber);
+                    ipInfo->_someConnectionAvailable.condWait(this_user_fiber);
+                    ipInfo->_someConnectionAvailable.unlock(this_user_fiber);
+                    continue;
                 }
-                else
-                {
-                    if((random() % 100) < PERCENT_LOGGING_SAMPLING)
-                        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt create a connection to "<<ipInfo->_ip<<" "<<strerror(errno)<<endl;
+                else {
+                    // Create new connection
+                    ipInfo->_inProcessCount++;
+                    returnConn = createConnection(ipInfo);
+                    ipInfo->_inProcessCount--;
+                    if(returnConn)
+                    {
+                        time_t currTime = time(NULL);
+                        ipInfo->_currentlyUsedConnections[returnConn] = currTime;
+                        _UFConnectionIpInfoMap[returnConn] = make_pair(ipInfo, make_pair(true, currTime));
+                    }
+                    else
+                    {
+                        if((random() % 100) < PERCENT_LOGGING_SAMPLING)
+                            cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt create a connection to "<<ipInfo->_ip<<" "<<strerror(errno)<<endl;
+                    }
+                    
+                    return returnConn;
                 }
             }
         }
-
-        if(returnConn)
-            break;
     }
-
-
-    //5. return the found connection
-    return returnConn;
+    
+    return NULL;
 }
 
 UFIO* UFConnectionPoolImpl::createConnection(UFConnectionIpInfo* ipInfo)
@@ -385,8 +360,12 @@ UFIO* UFConnectionPoolImpl::createConnec
         return NULL;
     }
 
-    ufio->connect((struct sockaddr *) &ipInfo->_sin, sizeof(ipInfo->_sin), 16000);
+    int rc = ufio->connect((struct sockaddr *) &ipInfo->_sin, sizeof(ipInfo->_sin), 1600000);
 
+    if(rc != 0) {
+        delete ufio;
+        return NULL;
+    }
     return ufio;
 }
 
@@ -422,6 +401,7 @@ void UFConnectionPoolImpl::clearBadConne
 
 void UFConnectionPoolImpl::releaseConnection(UFIO* ufIO, bool connOk)
 {
+    cerr << "UFConnectionPoolImpl::releaseConnection" << endl;
     if(!ufIO)
         return;
 
@@ -471,16 +451,15 @@ void UFConnectionPoolImpl::releaseConnec
         _UFConnectionIpInfoMap.erase(ufIOIpInfoLocItr);
     }
 
-    //signal to all the waiting threads that there might have been some change
+    //signal to all the first waiting threads that there might be a connection available
     UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
     UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
                         
-    _someConnectionAvailable.lock(this_user_fiber);
-    _someConnectionAvailable.broadcast();
-    _someConnectionAvailable.unlock(this_user_fiber);
+    ipInfo->_someConnectionAvailable.lock(this_user_fiber);
+    ipInfo->_someConnectionAvailable.signal();
+    ipInfo->_someConnectionAvailable.unlock(this_user_fiber);
 }
 
-
 const unsigned int PRINT_BUFFER_LENGTH = 256*1024;
 string UFConnectionPoolImpl::fillInfo(string& data, bool detailed) const
 {
@@ -576,23 +555,21 @@ UFConnectionPoolImpl::~UFConnectionPoolI
 
 void UFConnectionPoolCleaner::run()
 {
-    UF* uf = UFScheduler::getUF();
-    UFScheduler* ufs = uf->getParentScheduler();
+    UFScheduler *this_thread_scheduler = UFScheduler::getUFScheduler();
+    UF* this_uf = this_thread_scheduler->getRunningFiberOnThisThread();
     while(1)
     {
-        uf->usleep(300*1000*1000);
-        if(!_conn_pool)
+        this_uf->usleep(300*1000*1000);
+        if(!this_thread_scheduler->_conn_pool)
             break;
-        _conn_pool->clearBadConnections();
+        this_thread_scheduler->_conn_pool->clearBadConnections();
     }
-    ufs->setExit();
+    this_thread_scheduler->setExit();
 }
 
 UFConnectionPoolImpl::UFConnectionPoolImpl()
-{ 
-    // add fiber to monitor connections on thread
-    // if (!thread_create(runThreadToMontiorBadConnections, this, 0, 4*1024))
-    //    cerr<<"couldnt create thread to monitor bad connections"<<endl;
+{
+
 }
 
 void UFConnectionPoolImpl::init()
@@ -605,6 +582,9 @@ void UFConnectionPoolImpl::init()
     }
 }
 
+int UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST = 5;
+int UFConnectionPool::TIMEOUT_PER_REQUEST = 10;
+
 UFConnectionPool::UFConnectionPool() 
 { 
     impl = new UFConnectionPoolImpl(); 

Modified: trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H Sun May 16 02:10:55 2010
@@ -39,8 +39,6 @@ public:
 protected:
     GroupIPMap                                  _groupIpMap;
     UFConnectionIpInfoMap                       _UFConnectionIpInfoMap;
-    UFMutex                                     _someConnectionAvailable;
-
 
     UFIO* createConnection(UFConnectionIpInfo* stIpInfo);
 };

Modified: trafficserver/traffic/branches/UserFiber/src/UFIO.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFIO.C?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFIO.C Sun May 16 02:10:55 2010
@@ -1,4 +1,5 @@
 #include "UFIO.H"
+#include "UFConnectionPool.H"
 #include <netdb.h>
 #include <sys/socket.h> 
 #include <sys/time.h> 
@@ -17,7 +18,6 @@
 #include <errno.h>
 #include <string>
 #include <string.h>
-
 using namespace std;
 
 static int makeSocketNonBlocking(int fd)
@@ -620,6 +620,7 @@ EpollUFIOScheduler::EpollUFIOScheduler(U
     _epollFd = -1;
     _epollEventStruct = 0;
     _alreadySetup = false;
+    _earliestWakeUpFromSleep = 0;
 }
 
 EpollUFIOScheduler::~EpollUFIOScheduler()
@@ -698,7 +699,6 @@ bool EpollUFIOScheduler::addToScheduler(
     {
         struct timeval now;
         gettimeofday(&now, 0);
-        unsigned long long int timeNow = now.tv_sec*1000000+now.tv_usec;
         UFSleepInfo* ufsi = getSleepInfo();
         if(!ufsi)
         {
@@ -707,7 +707,11 @@ bool EpollUFIOScheduler::addToScheduler(
         }
         ufsi->_ufio = ufio;
         ufio->_sleepInfo = ufsi;
-        _sleepList.insert(std::make_pair((timeNow+to), ufsi));
+        unsigned long long int timeToWakeUp = now.tv_sec*1000000+now.tv_usec + to;
+        if(_earliestWakeUpFromSleep > timeToWakeUp ||
+           !_earliestWakeUpFromSleep)
+            _earliestWakeUpFromSleep = timeToWakeUp;
+        _sleepList.insert(std::make_pair(timeToWakeUp, ufsi));
     }
 
     ufio->getUF()->block(); //switch context till someone wakes me up
@@ -897,13 +901,16 @@ void EpollUFIOScheduler::waitForEvents(T
 
     int nfds;
     struct timeval now;
+    unsigned long long int timeNow = 0;
     IntUFIOMap::iterator index;
     UFIO* ufio = 0;
     UF* uf = 0;
     unsigned long long int amtToSleep = timeToWait;
+    unsigned long long int ufsAmtToSleep = 0;
     int i = 0;
     _interruptedByEventFd = false;
     UFScheduler* ufs = _uf->getParentScheduler();
+    list<UF*> ufsToAddToScheduler;
     if(!ufs)
     {
         cerr<<"epoll scheduler has to be connected to some scheduler"<<endl;
@@ -917,13 +924,11 @@ void EpollUFIOScheduler::waitForEvents(T
             _uf->yield();
         }
 
-        amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
-        if(amtToSleep > ufs->getAmtToSleep())
-            amtToSleep = ufs->getAmtToSleep();
-        nfds = ::epoll_wait(_epollFd, 
-                            _epollEventStruct, 
-                            _maxFds, 
-                            amtToSleep);
+        if(amtToSleep > (ufsAmtToSleep = ufs->getAmtToSleep()))
+            amtToSleep = (int)(ufsAmtToSleep/1000);
+        else
+            amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
+        nfds = ::epoll_wait(_epollFd, _epollEventStruct, _maxFds, amtToSleep);
         if(nfds > 0)
         {
             //for each of the fds that had activity activate them
@@ -962,40 +967,46 @@ void EpollUFIOScheduler::waitForEvents(T
         if(!_sleepList.empty())
         {
             gettimeofday(&now, 0);
-            unsigned long long int timeNow = (now.tv_sec*1000000)+now.tv_usec;
-            for( MapTimeUFIO::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
+            timeNow = (now.tv_sec*1000000)+now.tv_usec;
+            if(timeNow >= _earliestWakeUpFromSleep) //dont go into this queue unless the time seen the last time has passed
             {
-                //1. see if anyone has crossed the sleep timer - add them to the active list
-                if(beg->first <= timeNow) //sleep time is over
+                ufsToAddToScheduler.clear();
+                for( MapTimeUFIO::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
                 {
-                    UFSleepInfo* ufsi = beg->second;
-                    if(ufsi)
+                    //1. see if anyone has crossed the sleep timer - add them to the active list
+                    if(beg->first <= timeNow) //sleep time is over
                     {
-                        UFIO* ufio = ufsi->_ufio;
-                        if(ufio &&
-                           ufio->_sleepInfo == ufsi &&  //make sure that the ufio is not listening on another sleep counter right now
-                           ufio->_uf->_status == BLOCKED) //make sure that the uf hasnt been unblocked already
+                        UFSleepInfo* ufsi = beg->second;
+                        if(ufsi)
                         {
-                            ufio->_sleepInfo = 0;
-                            ufio->_errno = ETIMEDOUT;
-                            ufio->_uf->getParentScheduler()->addFiberToScheduler(ufio->_uf, 0);
-                            //this is so that we dont have to wait to handle the conn. being woken up
-                            _interruptedByEventFd = true;
-                       }
-
-                       releaseSleepInfo(*ufsi);
+                            UFIO* ufio = ufsi->_ufio;
+                            if(ufio &&
+                                ufio->_sleepInfo == ufsi &&  //make sure that the ufio is not listening on another sleep counter right now
+                                ufio->_uf->_status == BLOCKED) //make sure that the uf hasnt been unblocked already
+                            {
+                                ufio->_sleepInfo = 0;
+                                ufio->_errno = ETIMEDOUT;
+                                ufsToAddToScheduler.push_back(ufio->_uf);
+                                //this is so that we dont have to wait to handle the conn. being woken up
+                                _interruptedByEventFd = true;
+                            }
+
+                            releaseSleepInfo(*ufsi);
+                        }
+
+                        _sleepList.erase(beg);
+                        beg = _sleepList.begin();
+                        continue;
                     }
-
-                    _sleepList.erase(beg);
-                    beg = _sleepList.begin();
-                    continue;
-                }
-                else
-                {
-                    amtToSleep = (amtToSleep > beg->first-timeNow) ? beg->first-timeNow : amtToSleep;
-                    break;
+                    else
+                    {
+                        amtToSleep = (amtToSleep > beg->first-timeNow) ? beg->first-timeNow : amtToSleep;
+                        _earliestWakeUpFromSleep = beg->first;
+                        break;
+                    }
+                    ++beg;
                 }
-                ++beg;
+                ufs->addFibersToScheduler(ufsToAddToScheduler, 0);
             }
         }
 
@@ -1025,5 +1036,8 @@ void IORunner::run()
 void UFIO::ufCreateThreadWithIO(pthread_t* tid, list<UF*>* ufsToStartWith)
 {
     ufsToStartWith->push_front(new IORunner());
+
+    // Add connection pool cleaner
+    //ufsToStartWith->push_back(new UFConnectionPoolCleaner); //TODO: figure out how to deal w/ inactive connections
     UFScheduler::ufCreateThread(tid, ufsToStartWith);
 }

Modified: trafficserver/traffic/branches/UserFiber/src/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFServer.C?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFServer.C Sun May 16 02:10:55 2010
@@ -99,42 +99,6 @@ struct AcceptRunner : public UF
 int AcceptRunner::_myLoc = -1;
 AcceptRunner* AcceptRunner::_self = new AcceptRunner(true);
 
-static void* acceptThreadStart(void* args)
-{
-    if(!args)
-        return 0;
-
-    UFServer* ufserver = (UFServer*) args;
-
-    UFScheduler ufs;
-    //add the io scheduler
-    ufs.addFiberToScheduler(new IORunner());
-    //add the accept port fiber
-    UF* uf = new AcceptRunner();
-    if(!uf)
-        return 0;
-    uf->_startingArgs = args;
-    ufs.addFiberToScheduler(uf);
-
-    ufserver->addThread("ACCEPT", 0);
-    ufs.runScheduler();
-    return 0;
-}
-
-static void* ioThreadStart(void* args)
-{
-    if(!args)
-        return 0;
-
-    UFScheduler ufs;
-    //add the io scheduler
-    ufs.addFiberToScheduler(new IORunner());
-
-    ((UFServer*) args)->addThread("NETIO", &ufs);
-    ufs.runScheduler();
-    return 0;
-}
-
 void UFServer::startThreads()
 {
     preThreadCreation();
@@ -142,40 +106,43 @@ void UFServer::startThreads()
     MAX_THREADS_ALLOWED = (MAX_THREADS_ALLOWED ? MAX_THREADS_ALLOWED : 1);
     MAX_ACCEPT_THREADS_ALLOWED = (MAX_ACCEPT_THREADS_ALLOWED ? MAX_ACCEPT_THREADS_ALLOWED : 1);
 
+    unsigned int i = 0;
     pthread_t* thread = new pthread_t[MAX_THREADS_ALLOWED+MAX_ACCEPT_THREADS_ALLOWED];
-    pthread_attr_t attr;
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
     //start the IO threads
-    unsigned int i = 0;
     for(; i<MAX_THREADS_ALLOWED; i++)
     {
-        if(pthread_create(&(thread[i]), &attr, ioThreadStart, this) != 0)
+        UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>());
+        usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to cond. var later
+        //add the io threads to the thread chooser
+        UFScheduler* ufs = UFScheduler::getUFScheduler(thread[i]);
+        if(!ufs)
         {
-            cerr<<"couldnt create thread "<<strerror(errno)<<endl;
+            cerr<<"didnt get scheduler for tid = "<<thread[i]<<endl;
             exit(1);
         }
-        usleep(500); //TODO: avoid the need for threadChooser to have a mutex
+        addThread("NETIO", ufs, thread[i]);
     }
 
     //start the stats thread
     UFStatSystem::init(this);
 
-    usleep(1000); //wait before starting the accept thread //TODO: change to cond signal later
-
     preAccept();
-
     //start the accept thread
     for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
     {
-        if(pthread_create(&(thread[i]), &attr, acceptThreadStart, this) != 0)
-        {
-            cerr<<"couldnt create accept thread "<<strerror(errno)<<endl;
-            exit(1);
-        }
+        AcceptRunner* ar = new AcceptRunner();
+        ar->_startingArgs = this;
+        list<UF*>* ufsToAdd = new list<UF*>();
+        ufsToAdd->push_back(ar);
+        UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
+
+        usleep(5000); //TODO: let the thread finish initializing 
+        addThread("ACCEPT", 0, thread[i]);
     }
 
+    cerr<<"starting server"<<endl;
+
+
     //wait for the threads to finish
     void* status;
     for(i=0; i<MAX_THREADS_ALLOWED+MAX_ACCEPT_THREADS_ALLOWED; i++)
@@ -260,9 +227,11 @@ void UFServer::run()
     }
 }
 
-void UFServer::addThread(const std::string& type, UFScheduler* ufScheduler)
+void UFServer::addThread(const std::string& type, UFScheduler* ufScheduler, pthread_t tid)
 {
-    pthread_t tid = pthread_self();
+    if(!tid)
+        tid = pthread_self();
+
     StringThreadMapping::iterator index = _threadList.find(type);
     if(index == _threadList.end())
     {