You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/13 23:47:45 UTC

svn commit: r944023 - in /trafficserver/traffic/branches/UserFiber: include/UF.H include/UFIO.H src/UF.C src/UFConnectionPoolImpl.C src/UFIO.C

Author: akundu
Date: Thu May 13 21:47:44 2010
New Revision: 944023

URL: http://svn.apache.org/viewvc?rev=944023&view=rev
Log:
make sure that if a new event waking up from sleep or a new event added to the scheduler get processed immediately

Added:
    trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C
Modified:
    trafficserver/traffic/branches/UserFiber/include/UF.H
    trafficserver/traffic/branches/UserFiber/include/UFIO.H
    trafficserver/traffic/branches/UserFiber/src/UF.C
    trafficserver/traffic/branches/UserFiber/src/UFIO.C

Modified: trafficserver/traffic/branches/UserFiber/include/UF.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UF.H?rev=944023&r1=944022&r2=944023&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UF.H Thu May 13 21:47:44 2010
@@ -149,9 +149,10 @@ struct UFScheduler
     //call this fxn the first time you're adding a UF 
     //(not after that - currently cant move an existing UF to a different thread)
     bool addFiberToScheduler(UF* uf,      /* the UF to add */
-                                pthread_t tid = 0); /* the thread to add the UF to */
+                             pthread_t tid = 0); /* the thread to add the UF to */
     //add the fxn to add multiple ufs in one shot (if they're on one tid)
-    bool addFibersToScheduler(const std::list<UF*>& ufList, pthread_t tid = 0);
+    bool addFibersToScheduler(const std::list<UF*>& ufList, 
+                              pthread_t tid = 0);
 
 
 

Modified: trafficserver/traffic/branches/UserFiber/include/UFIO.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFIO.H?rev=944023&r1=944022&r2=944023&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFIO.H Thu May 13 21:47:44 2010
@@ -114,6 +114,7 @@ struct UFIO
     UFSleepInfo*                _sleepInfo;
 
     static void ufCreateThreadWithIO(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+    static UFIOScheduler* getUFIOS();
 
 protected:
     int                         _fd;
@@ -129,7 +130,6 @@ protected:
 
     int                         _lastEpollFlag;
     bool                        _amtReadLastTimeEqualToAskedAmt;
-    static UFIOScheduler* getUFIOS();
 };
 inline unsigned int UFIO::getErrno() const { return _errno; }
 inline int UFIO::getFd() const { return _fd; }

Modified: trafficserver/traffic/branches/UserFiber/src/UF.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UF.C?rev=944023&r1=944022&r2=944023&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UF.C Thu May 13 21:47:44 2010
@@ -171,7 +171,7 @@ bool UFScheduler::addFiberToScheduler(UF
     }
 
     list<UF*> ufList;
-    ufList.push_front(uf);
+    ufList.push_back(uf);
     return addFibersToScheduler(ufList, tid);
 }
 
@@ -255,7 +255,6 @@ void UFScheduler::runScheduler()
 
     _amtToSleep = DEFAULT_SLEEP_IN_USEC;
     bool ranGetTimeOfDay = false;
-    bool firstRun = true;
 
     UFList::iterator beg;
     struct timeval now;
@@ -297,33 +296,7 @@ void UFScheduler::runScheduler()
         }
 
 
-        //check if some other thread has nominated some user fiber to be
-        //added to this thread's list -
-        //can happen in the foll. situations
-        //1. the main thread is adding a new user fiber
-        //2. some fiber has requested to move to another thread
-        if(!_nominateToAddToActiveRunningList.empty() /*TODO: take this out later w/ the atomic size count*/ &&
-           _inThreadedMode)
-
-        {
-            //TODO: do atomic comparison to see if there is anything in 
-            //_nominateToAddToActiveRunningList before getting the lock
-            pthread_mutex_lock(&_mutexToNominateToActiveList);
-            UFList::iterator beg = _nominateToAddToActiveRunningList.begin();
-            for(; beg != _nominateToAddToActiveRunningList.end(); )
-            {
-                UF* uf = *beg;
-                if(uf->_parentScheduler)
-                    _activeRunningList.push_back(uf);
-                else //adding a new fiber
-                    addFiberToScheduler(uf, 0);
-                beg = _nominateToAddToActiveRunningList.erase(beg);
-            }
-
-            pthread_mutex_unlock(&_mutexToNominateToActiveList);
-        }
-
-
+        //check the sleep queue
         ranGetTimeOfDay = false;
         _amtToSleep = DEFAULT_SLEEP_IN_USEC;
         //pick up the fibers that may have completed sleeping
@@ -333,29 +306,55 @@ void UFScheduler::runScheduler()
             gettimeofday(&now, 0);
             ranGetTimeOfDay = true;
             timeNow = (now.tv_sec*1000000)+now.tv_usec;
-            firstRun = true;
             for(MapTimeUF::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
             {
                 //TODO: has to be cleaned up
                 //1. see if anyone has crossed the sleep timer - add them to the active list
                 if(beg->first <= timeNow) //sleep time is over
                 {
-                    _activeRunningList.push_front(beg->second);
+                    _activeRunningList.push_front(beg->second); //putting to the front - so that it gets evaluated before anything else
                     _sleepList.erase(beg);
                     beg = _sleepList.begin();
                     continue;
                 }
                 else
                 {
-                    if(firstRun)
-                        _amtToSleep = beg->first-timeNow;
+                    _amtToSleep = beg->first-timeNow;
                     break;
                 }
-                firstRun = false;
                 ++beg;
             }
         }
 
+
+        //check if some other thread has nominated some user fiber to be
+        //added to this thread's list -
+        //can happen in the foll. situations
+        //1. the main thread is adding a new user fiber
+        //2. some fiber has requested to move to another thread
+        if(!_nominateToAddToActiveRunningList.empty() /*TODO: take this out later w/ the atomic size count*/ &&
+           _inThreadedMode)
+
+        {
+            _amtToSleep = 0;
+            //TODO: do atomic comparison to see if there is anything in 
+            //_nominateToAddToActiveRunningList before getting the lock
+            pthread_mutex_lock(&_mutexToNominateToActiveList);
+            UFList::iterator beg = _nominateToAddToActiveRunningList.begin();
+            for(; beg != _nominateToAddToActiveRunningList.end(); )
+            {
+                UF* uf = *beg;
+                if(uf->_parentScheduler)
+                    _activeRunningList.push_front(uf);
+                else //adding a new fiber
+                    addFiberToScheduler(uf, 0);
+                beg = _nominateToAddToActiveRunningList.erase(beg);
+            }
+
+            pthread_mutex_unlock(&_mutexToNominateToActiveList);
+        }
+
+
         //see if there is anything to do or is it just sleeping time now
         if(!_notifyFunc && _activeRunningList.empty() && !_exit)
         {

Added: trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C?rev=944023&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C (added)
+++ trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C Thu May 13 21:47:44 2010
@@ -0,0 +1,783 @@
+#include "UFConnectionPool.H"
+#include "UFConnectionPoolImpl.H"
+
+#include "UFIO.H"
+#include <stdlib.h>
+#include <fstream>
+#include <iostream>
+#include <sstream>
+#include <stdio.h>
+#include <errno.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+const unsigned short int PERCENT_LOGGING_SAMPLING = 5;
+
+using namespace std;
+
+UFConnectionIpInfo::UFConnectionIpInfo(const string& ip, bool persistent, int maxSimultaneousConns, int timeOutPerTransaction)
+{
+    _ip = ip;
+    _persistent = persistent;
+    _maxSimultaneousConns = maxSimultaneousConns;
+    if(timeOutPerTransaction > 0)
+        _timeOutPerTransaction = timeOutPerTransaction*1000;
+    else
+        _timeOutPerTransaction = -1;
+    _timedOut = 0;
+    _inProcessCount = 0;
+
+    size_t index = _ip.find_last_of(':'); 
+    string ip_to_connect = (index == string::npos ) ? _ip : _ip.substr(0, index);
+    string port = (index == string::npos ) ? "0" : _ip.substr(index+1);
+    
+    memset(&_sin, 0, sizeof(_sin));
+    _sin.sin_family = AF_INET;
+    _sin.sin_addr.s_addr = inet_addr(ip_to_connect.c_str());
+    _sin.sin_port = htons(atoi(port.c_str()));
+}
+
+static void read_address(const char *str, struct sockaddr_in *sin)
+{
+    char host[128], *p;
+    struct hostent *hp;
+    short port;
+
+    strcpy(host, str);
+    if ((p = strchr(host, ':')) == NULL)
+    {
+        cerr<<"invalid host: "<<host<<endl;
+        exit(1);
+    }
+    *p++ = '\0';
+    port = (short) atoi(p);
+    if (port < 1)
+    {
+
+        cerr<<"invalid port: "<<port<<endl;
+        exit(1);
+    }
+
+    memset(sin, 0, sizeof(struct sockaddr_in));
+    sin->sin_family = AF_INET;
+    sin->sin_port = htons(port);
+    if (host[0] == '\0')
+    {
+        sin->sin_addr.s_addr = INADDR_ANY;
+        return;
+    }
+    sin->sin_addr.s_addr = inet_addr(host);
+    if (sin->sin_addr.s_addr == INADDR_NONE)
+    {
+        /* not dotted-decimal */
+        if ((hp = gethostbyname(host)) == NULL)
+        {
+            cerr<<"cant resolve address "<<host<<endl;
+            exit(1);
+        }
+        memcpy(&sin->sin_addr, hp->h_addr, hp->h_length);
+    }
+}
+
+UFConnectionIpInfo* UFConnectionGroupInfo::removeIP(const string& ip)
+{
+    UFConnectionIpInfoList::iterator beg = _ipInfoList.begin();
+    for(; beg != _ipInfoList.end(); ++beg)
+    {
+        if((*beg)->_ip == ip)
+        {
+            UFConnectionIpInfo* info = *beg;
+            _ipInfoList.erase(beg);
+            return info;
+        }
+    }
+    return NULL;
+}
+
+bool UFConnectionGroupInfo::addIP(UFConnectionIpInfo* stIpInfo)
+{
+    if(!stIpInfo)
+    {
+        cerr<<"empty/invalid stIpInfo obj passed in "<<endl;
+        return false;
+    }
+
+    read_address(stIpInfo->_ip.c_str(), &(stIpInfo->_sin));
+    if(stIpInfo->_sin.sin_addr.s_addr == INADDR_ANY)   
+    {
+        cerr<<"couldnt resolve address:port = "<<stIpInfo->_ip<<endl;
+        return false;
+    }
+
+    _ipInfoList.push_back(stIpInfo);
+    return true;
+}
+
+double UFConnectionGroupInfo::getAvailability() const
+{
+    int timed_out_count = 0;
+    int total_count = 0;
+    UFConnectionIpInfoList::const_iterator itr = _ipInfoList.begin();
+    for(;itr != _ipInfoList.end(); ++itr)
+    {
+        if(!(*itr))
+            continue;
+
+        total_count++;
+        if((*itr)->_timedOut)
+        {
+            if ( ((*itr)->_timedOut + UFConnectionPoolImpl::_timeoutIP) < time(0) )
+                timed_out_count++;
+            else
+                (*itr)->_timedOut = 0;
+        }
+    }
+
+    return ((total_count > 0 ) ? ((total_count-timed_out_count)*100)/total_count : 0);
+}
+
+UFConnectionGroupInfo::UFConnectionGroupInfo(const std::string& name)
+{
+    _name = name;
+}
+
+UFConnectionGroupInfo::~UFConnectionGroupInfo()
+{
+    unsigned int ipInfoListSize = _ipInfoList.size();
+    for(unsigned int i = 0; i < ipInfoListSize; ++i)
+        delete _ipInfoList[i];
+}
+
+time_t UFConnectionPoolImpl::_timeoutIP = DEFAULT_TIMEOUT_OF_IP_ON_FAILURE;
+bool UFConnectionPoolImpl::addGroup(UFConnectionGroupInfo* groupInfo)
+{
+    if(!groupInfo)
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"invalid/empty group passed"<<endl;
+        return false;
+    }
+
+    if(!groupInfo->_name.length())
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"empty group name passed"<<endl;
+        return false;
+    }
+
+    if(_groupIpMap.find(groupInfo->_name) != _groupIpMap.end())
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"group with name "<<groupInfo->_name <<" already exists"<<endl;
+        return false;
+    }
+
+    _groupIpMap[groupInfo->_name] = groupInfo;
+    return true;
+}
+
+//TODO: figure out whether we want to delete the group object on the removeGroup and the destructor fxn calls
+UFConnectionGroupInfo* UFConnectionPoolImpl::removeGroup(const std::string& name)
+{
+    GroupIPMap::iterator foundItr = _groupIpMap.find(name);
+    if(foundItr == _groupIpMap.end())
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"didnt find group with name "<<name <<" to remove"<<endl;
+        return NULL;
+    }
+
+    UFConnectionGroupInfo* removedObj = (*foundItr).second;
+    _groupIpMap.erase(foundItr);
+    return removedObj;
+}
+
+UFIO* UFConnectionPoolImpl::getConnection(const std::string& groupName)
+{
+    return getConnection(groupName, false);
+}
+
+static int MAX_SIMUL_CONNS_PER_HOST = 5;
+static int TIMEOUT_PER_REQUEST = 10;
+UFConnectionGroupInfo* UFConnectionPoolImpl::addGroupImplicit(const std::string& groupName)
+{
+    UFConnectionGroupInfo* group = new UFConnectionGroupInfo(groupName);
+    if(!group)
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" couldnt allocate memory to create group obj"<<endl;
+        return NULL;
+    }
+    
+    UFConnectionIpInfo* ip = new UFConnectionIpInfo(groupName,
+                                                    true, 
+                                                    MAX_SIMUL_CONNS_PER_HOST,
+                                                    TIMEOUT_PER_REQUEST);
+    if(!ip)
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" couldnt create the ip obj"<<endl;
+        delete group;
+        return NULL;
+    }
+    group->addIP(ip);
+    addGroup(group);
+    return group;
+}
+
+UFIO* UFConnectionPoolImpl::getConnection(const std::string& groupName, bool waitForConnection)
+{
+    if(!groupName.length())
+        return NULL;
+
+    GroupIPMap::iterator foundItr = _groupIpMap.find(groupName);
+    UFConnectionGroupInfo* groupInfo = NULL;
+    if((foundItr == _groupIpMap.end()) || !((*foundItr).second))
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"null group or didnt find group with name "<<groupName<<endl;
+        groupInfo = addGroupImplicit(groupName);
+        if(!groupInfo)
+            return NULL;
+    }
+    else
+    {
+        groupInfo = (*foundItr).second;
+    }
+
+
+    UFIO* returnConn = NULL;
+    map<unsigned int,unsigned int> alreadySeenIPList; //this list will keep track of the ips that we've already seen
+    unsigned int groupIpSize = groupInfo->_ipInfoList.size();
+    while(alreadySeenIPList.size() < groupIpSize) //bail out if we've seen all the ips already
+    {
+        //1a. first try to find a connection that already might exist - after that we'll try randomly picking an ip
+        int elementNum = -1;
+        for(unsigned int i = 0; i < groupIpSize; i++)
+        {
+            if(alreadySeenIPList.find(i) != alreadySeenIPList.end()) //already seen this IP
+                continue;
+
+            UFConnectionIpInfo* ipInfo = groupInfo->_ipInfoList[i]; 
+            if(ipInfo && ipInfo->_currentlyAvailableConnections.size())
+            {
+                elementNum = i;
+                alreadySeenIPList[elementNum] = 1;
+                break;
+            }
+        }
+
+        //1b. randomly pick a host that is not timedout w/in the list of ips for the group
+        if(elementNum == -1)
+        {
+            elementNum = random() % groupIpSize;
+            if(alreadySeenIPList.find(elementNum) != alreadySeenIPList.end()) //already seen this IP
+                continue;
+            alreadySeenIPList[elementNum] = 1;
+        }
+
+        UFConnectionIpInfo* ipInfo = groupInfo->_ipInfoList[elementNum]; 
+        if(!ipInfo)
+            //TODO: remove this empty ipInfo obj
+            continue;
+
+        //2. while the host is timedout - pick another one (put into the list of already seen ips)
+        if(ipInfo->_timedOut && ((ipInfo->_timedOut + _timeoutIP) > time(NULL)) )
+            continue;
+        ipInfo->_timedOut = 0;
+
+        //3. pick a connection from the currently available conns
+        //   (if there are any available)
+        UFIOIntMap::iterator beg = ipInfo->_currentlyAvailableConnections.begin();
+        for(; beg != ipInfo->_currentlyAvailableConnections.end(); 
+              beg = ipInfo->_currentlyAvailableConnections.begin()  // we're resetting to the beginning to avoid
+                                                                    // the case of two threads ending up getting 
+                                                                    // the same connection
+           )
+        {
+            returnConn = NULL;
+            if(!((*beg).first))
+            {
+                ipInfo->_currentlyAvailableConnections.erase(beg);
+                cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
+                continue;
+            }
+            returnConn = (*beg).first;
+            //take the found connection away from the curentlyAvaliableConnections list
+            //since validConnection now actually checks to see the content thats within the channel to verify the validity of the connection
+            //it may be that the thread gets switched out and some other thread comes into this section
+            ipInfo->_currentlyAvailableConnections.erase(beg);
+            ipInfo->_currentlyUsedConnections[returnConn] = time(NULL);
+            break;
+        }
+
+        //4. if no connections are available then create a new connection if allowed
+        if(!returnConn)
+        {
+            bool getConnection = false;
+
+            if ((ipInfo->_maxSimultaneousConns < 0) || 
+                (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount < (unsigned int) ipInfo->_maxSimultaneousConns)
+               )
+                getConnection = true;
+            else if ( waitForConnection && 
+                      (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns)
+                    )
+            {
+                //wait for the signal to get pinged
+                //unsigned short int counter = 0;
+                //while(counter < MAX_WAIT_FOR_CONNECTION_TO_BE_AVAILABLE) //we only try for 10 times
+                while(true)
+                {
+                    if (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount
+                            >= (unsigned int) ipInfo->_maxSimultaneousConns)
+                    {
+                        UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+                        UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
+                        _someConnectionAvailable.lock(this_user_fiber);
+                        _someConnectionAvailable.condWait(this_user_fiber);
+                        _someConnectionAvailable.unlock(this_user_fiber);
+                    }
+                    else
+                    {
+                        getConnection = true; //this is here just so that if we ever go to the path of having a 
+                                              //max allowed waiting time - this var. will only get set if the 
+                                              //condition is met
+                        break;
+                    }
+                }
+            }
+
+            if(getConnection)
+            {
+                ipInfo->_inProcessCount++;
+                returnConn = createConnection(ipInfo);
+                ipInfo->_inProcessCount--;
+                if(returnConn)
+                {
+                    time_t currTime = time(NULL);
+                    ipInfo->_currentlyUsedConnections[returnConn] = currTime;
+                    _UFConnectionIpInfoMap[returnConn] = make_pair(ipInfo, make_pair(true, currTime));
+                }
+                else
+                {
+                    if((random() % 100) < PERCENT_LOGGING_SAMPLING)
+                        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt create a connection to "<<ipInfo->_ip<<" "<<strerror(errno)<<endl;
+                }
+            }
+        }
+
+        if(returnConn)
+            break;
+    }
+
+
+    //5. return the found connection
+    return returnConn;
+}
+
+UFIO* UFConnectionPoolImpl::createConnection(UFConnectionIpInfo* ipInfo)
+{
+    if(!ipInfo)
+        return NULL;
+    
+    UFIO* ufio = new UFIO(UFScheduler::getUF());
+    if(!ufio)
+    {
+        cerr<<"couldnt get UFIO object"<<endl;
+        return NULL;
+    }
+
+    ufio->connect((struct sockaddr *) &ipInfo->_sin, sizeof(ipInfo->_sin), 16000);
+
+    return ufio;
+}
+
+//This fxn helps remove conns. that may have been invalidated while being in the waiting to be used state
+const unsigned int LAST_USED_TIME_DIFF = 300;
+void UFConnectionPoolImpl::clearBadConnections()
+{
+    time_t currTime = time(NULL);
+    UFConnectionIpInfoMap::iterator beg = _UFConnectionIpInfoMap.begin();
+    for(; beg != _UFConnectionIpInfoMap.end(); )
+    {
+        UFConnectionIpInfo* ip = beg->second.first;
+        bool currentlyUsed = beg->second.second.first;
+        if(currentlyUsed) //we dont remove conns that are in use
+        {
+            ++beg;
+            continue;
+        }
+
+        time_t lastUsed = beg->second.second.second;
+        UFIO* ufio = beg->first;
+        if(!ip || 
+           !ufio || 
+           ((lastUsed + LAST_USED_TIME_DIFF) < (unsigned int) currTime))
+        {
+            ++beg;
+            releaseConnection(ufio, false);
+            continue;
+        }
+        ++beg;
+    }
+}
+
+void UFConnectionPoolImpl::releaseConnection(UFIO* ufIO, bool connOk)
+{
+    if(!ufIO)
+        return;
+
+    //find the ipinfo associated w/ this connection
+    UFConnectionIpInfoMap::iterator ufIOIpInfoLocItr = _UFConnectionIpInfoMap.find(ufIO);
+    if((ufIOIpInfoLocItr == _UFConnectionIpInfoMap.end()) || !(*ufIOIpInfoLocItr).second.first)
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt find the associated ipinfo object or the object was empty - not good"<<endl;
+        if(ufIOIpInfoLocItr != _UFConnectionIpInfoMap.end())
+            _UFConnectionIpInfoMap.erase(ufIOIpInfoLocItr);
+        ufIO = NULL;
+        return;
+    }
+
+    UFConnectionIpInfo* ipInfo = (*ufIOIpInfoLocItr).second.first;
+    //remove the conn from the ipInfo->_currentlyUsedConnections list 
+    UFIOIntMap::iterator currUsedConnItr = ipInfo->_currentlyUsedConnections.find(ufIO);
+    if(currUsedConnItr != ipInfo->_currentlyUsedConnections.end())
+        ipInfo->_currentlyUsedConnections.erase(currUsedConnItr);
+    else
+    {
+        //see if the conn is in the available connection list
+        currUsedConnItr = ipInfo->_currentlyAvailableConnections.find(ufIO);
+        if(currUsedConnItr != ipInfo->_currentlyAvailableConnections.end())
+            ipInfo->_currentlyAvailableConnections.erase(currUsedConnItr);
+        else
+            cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt find the release connection in either the used or available list - not good"<<endl;
+
+        delete ufIO;
+        ufIO = NULL;
+        _UFConnectionIpInfoMap.erase(ufIOIpInfoLocItr);
+        return;
+    }
+
+
+    if(connOk && ipInfo->_persistent)
+    {
+        time_t currTime = time(NULL);
+        (*ufIOIpInfoLocItr).second.second.first = false;
+        (*ufIOIpInfoLocItr).second.second.second = currTime;
+        ipInfo->_currentlyAvailableConnections[ufIO] = currTime;
+    }
+    else
+    {
+        delete ufIO;
+        ufIO = NULL;
+        _UFConnectionIpInfoMap.erase(ufIOIpInfoLocItr);
+    }
+
+    //signal to all the waiting threads that there might have been some change
+    UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
+                        
+    _someConnectionAvailable.lock(this_user_fiber);
+    _someConnectionAvailable.broadcast();
+    _someConnectionAvailable.unlock(this_user_fiber);
+}
+
+
+const unsigned int PRINT_BUFFER_LENGTH = 256*1024;
+string UFConnectionPoolImpl::fillInfo(string& data, bool detailed) const
+{
+    char* printBuffer = new char[256*1024];
+    if(!printBuffer)
+        return data;
+
+    if(!detailed)
+        snprintf(printBuffer, 1024, "ConnectionInfo:\n%15s%10s%10s\n", "GroupName", "IpCount", "IP Avail.");
+    else
+        snprintf(printBuffer, 1024,  
+            "ConnectionInfo:\n%15s%10s%10s%35s%10s%10s%10s%10s%10s\n", 
+            "GroupName", 
+            "IpCount", 
+            "IP Avail.", 
+            "IP name", 
+            "isPersis", 
+            "TimedOut", 
+            "#Run", 
+            "#InProc.", 
+            "#Avail.");
+
+
+    //1. list out the current groups 
+    GroupIPMap::const_iterator groupIpMapItr = _groupIpMap.begin();
+    for(; groupIpMapItr != _groupIpMap.end() ; ++groupIpMapItr)
+    {
+        UFConnectionGroupInfo* tmpGroup = (*groupIpMapItr).second;
+        if(!tmpGroup)
+            continue;
+
+        unsigned int amtCopied = strlen(printBuffer);
+        if((amtCopied + 1024) >= PRINT_BUFFER_LENGTH) //we can't add anymore
+            break;
+
+        snprintf(printBuffer+strlen(printBuffer), 
+            1024, 
+            "%15s%10d%10d\n", 
+            (*groupIpMapItr).first.c_str(), 
+            (int) tmpGroup->_ipInfoList.size(), 
+            (int) tmpGroup->getAvailability());
+
+
+        if(detailed)
+        {
+            UFConnectionIpInfoList::const_iterator _ipInfoListItr = tmpGroup->_ipInfoList.begin();
+            for(; _ipInfoListItr != tmpGroup->_ipInfoList.end(); ++_ipInfoListItr)
+            {
+                UFConnectionIpInfo* ipInfo = (*_ipInfoListItr);
+                if(!ipInfo)
+                    continue;
+
+                snprintf(printBuffer+strlen(printBuffer), 
+                         1024, 
+                         "%70s%10d%10d%10d%10d%10d\n", 
+                         ipInfo->_ip.c_str(), 
+                         (int) ipInfo->_persistent, 
+                         (int) ipInfo->_timedOut, 
+                         (int) ipInfo->_currentlyUsedConnections.size(), 
+                         (int) ipInfo->_inProcessCount, 
+                         (int) ipInfo->_currentlyAvailableConnections.size());
+            }
+        }
+    }
+
+    data.append(printBuffer);
+    delete printBuffer;
+    return data;
+}
+
+double UFConnectionPoolImpl::getGroupAvailability(const std::string& name) const
+{
+    double result = 0;
+    if(!name.length())
+        return result;
+
+    GroupIPMap::const_iterator foundItr = _groupIpMap.find(name);
+    if((foundItr == _groupIpMap.end()) || !((*foundItr).second))
+    {
+        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"null group or didnt find group with name "<<((*foundItr).second ? (*foundItr).second->_name : "")<<endl;
+        return result;
+    }
+
+    return (*foundItr).second->getAvailability();
+}
+
+UFConnectionPoolImpl::~UFConnectionPoolImpl()
+{ 
+    for(GroupIPMap::iterator beg = _groupIpMap.begin(); beg != _groupIpMap.end(); ++beg)
+        delete beg->second;
+    _groupIpMap.clear();
+}
+
+void UFConnectionPoolCleaner::run()
+{
+    UF* uf = UFScheduler::getUF();
+    UFScheduler* ufs = uf->getParentScheduler();
+    while(1)
+    {
+        uf->usleep(300*1000*1000);
+        if(!_conn_pool)
+            break;
+        _conn_pool->clearBadConnections();
+    }
+    ufs->setExit();
+}
+
+UFConnectionPoolImpl::UFConnectionPoolImpl()
+{ 
+    // add fiber to monitor connections on thread
+    // if (!thread_create(runThreadToMontiorBadConnections, this, 0, 4*1024))
+    //    cerr<<"couldnt create thread to monitor bad connections"<<endl;
+}
+
+void UFConnectionPoolImpl::init()
+{
+    static bool ranSrandom = false;
+    if(!ranSrandom)
+    {
+        srand(getpid());
+        ranSrandom = true;
+    }
+}
+
+UFConnectionPool::UFConnectionPool() 
+{ 
+    impl = new UFConnectionPoolImpl(); 
+}
+
+void UFConnectionPool::init()
+{
+    UFConnectionPoolImpl::init();
+}
+
+UFConnectionGroupInfo* UFConnectionPool::removeGroup(const string& name)
+{
+    if(!impl)
+        return NULL;
+    return impl->removeGroup(name);
+}
+
+bool UFConnectionPool::addGroup(UFConnectionGroupInfo* stGroupInfo)
+{
+    if(!impl)
+        return false;
+    return impl->addGroup(stGroupInfo);
+}
+
+UFIO* UFConnectionPool::getConnection(const std::string& groupName, bool waitForConnection)
+{
+    if(!impl)
+        return NULL;
+    return impl->getConnection(groupName, waitForConnection);
+}
+
+UFIO* UFConnectionPool::getConnection(const string& groupName)
+{
+    if(!impl)
+        return NULL;
+    return impl->getConnection(groupName);
+}
+
+void UFConnectionPool::releaseConnection(UFIO* ufIO, bool connOk)
+{
+    if(!impl)
+        return;
+    return impl->releaseConnection(ufIO, connOk);
+}
+
+void UFConnectionPool::setTimeoutIP(int timeout) 
+{ 
+    if(!impl)
+        return;
+    if(timeout > -1)
+        impl->_timeoutIP = timeout; 
+    else
+        impl->_timeoutIP = 60;
+}
+
+string UFConnectionPool::fillInfo(string& data, bool detailed) const
+{
+    if(!impl)
+        return string("");
+    return impl->fillInfo(data, detailed);
+}
+
+double UFConnectionPool::getGroupAvailability(const std::string& name) const
+{
+    if(!impl)
+        return 0;
+    return impl->getGroupAvailability(name);
+}
+
+void UFConnectionPool::clearBadConnections()
+{
+    if(!impl)
+        return;
+    return impl->clearBadConnections();
+}
+
+string StringUtil::trim_ws(const string& input)
+{
+    if(!input.length())
+        return input;
+
+    size_t beg_position = input.find_first_not_of(" \n\r\t\r");
+    size_t end_position = input.find_last_not_of(" \n\t\r");
+
+    if(beg_position == string::npos)
+        beg_position = 0;
+    if(end_position == string::npos)
+        end_position = input.length();
+
+    return (input.substr(beg_position, (end_position-beg_position+1)));
+}
+
+unsigned int StringUtil::split(const string& input, const string& splitOn, StringVector& output)
+{
+    unsigned int copyStringBegin = 0;
+    output.clear();
+
+    while(copyStringBegin < input.length())
+    {
+        string::size_type findLoc = input.find(splitOn, copyStringBegin);
+        if(copyStringBegin != findLoc)      
+        {
+            string subStr = input.substr(copyStringBegin, (findLoc == string::npos) ? input.length()-copyStringBegin : findLoc - copyStringBegin);
+            if(subStr.length())
+                output.push_back(subStr);
+            if(findLoc == string::npos)
+                break;
+            copyStringBegin += subStr.length();
+        }
+        else
+            copyStringBegin += splitOn.length();
+    }
+
+    return output.size();
+}
+
+unsigned int UFConnectionPool::loadConfigFile(const string& fileName)
+{
+    return loadConfigFile(fileName, -1);
+}
+
+unsigned int UFConnectionPool::loadConfigFile(const string& fileName, int maxSimultaneousConns)
+{
+    ifstream infile(fileName.c_str());
+    if(!infile)
+        return false;
+
+    int num_groups_added = 0;
+    string line;
+    while(getline(infile, line))
+    {
+        if(line.find('#') != string::npos) //bail if we see # in the line
+            continue;
+
+        //split on ':'
+        StringUtil::StringVector compVec;
+        int numFound = StringUtil::split(line, ":", compVec);
+        if(numFound < 3)
+            continue;
+        string farmId = StringUtil::trim_ws(compVec[0]);
+        string timeOut = StringUtil::trim_ws(compVec[1]);
+        string ipList = StringUtil::trim_ws(compVec[2]);
+
+        if(!farmId.length() || !ipList.length() || (!timeOut.length()) || (timeOut == "*"))
+            continue;
+
+        StringUtil::StringVector ipVec;
+        numFound = StringUtil::split(ipList, ",", ipVec);
+        if(!numFound)
+        {
+            cerr<<"couldnt add "<<farmId<<" because the number of ips found was 0"<<endl;
+            continue;
+        }
+
+        //create the group
+        UFConnectionGroupInfo* tmpGroupInfo = new UFConnectionGroupInfo(farmId);
+        //create the ips and add them to the group
+        for(unsigned int i = 0; i < ipVec.size(); i++)
+        {
+            string ip = StringUtil::trim_ws(ipVec[i]);
+            ip.append(":1971");
+            UFConnectionIpInfo* tmpIpInfo = new UFConnectionIpInfo(ip, ((timeOut == "-1") ? true : false), maxSimultaneousConns);
+
+            tmpGroupInfo->addIP(tmpIpInfo);
+        }
+
+        //add the group to the connection pool
+        if(!addGroup(tmpGroupInfo))
+            cerr<<"couldnt add group "<<farmId<<" to the connection pool"<<endl;
+
+        num_groups_added++;
+    }
+
+    return num_groups_added;
+}
+

Modified: trafficserver/traffic/branches/UserFiber/src/UFIO.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFIO.C?rev=944023&r1=944022&r2=944023&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFIO.C Thu May 13 21:47:44 2010
@@ -416,7 +416,7 @@ int UFIO::connect(const struct sockaddr 
                TIME_IN_US timeout)
 {
     if(!isSetup()) //create the socket and make the socket non-blocking
-        return false;
+        return -1;
 
 
     //find the scheduler for this request
@@ -917,12 +917,13 @@ void EpollUFIOScheduler::waitForEvents(T
             _uf->yield();
         }
 
+        amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
         if(amtToSleep > ufs->getAmtToSleep())
             amtToSleep = ufs->getAmtToSleep();
         nfds = ::epoll_wait(_epollFd, 
                             _epollEventStruct, 
                             _maxFds, 
-                            (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1)); //sleep for atleast 1ms
+                            amtToSleep);
         if(nfds > 0)
         {
             //for each of the fds that had activity activate them