You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/16 04:10:55 UTC
svn commit: r944754 - in /trafficserver/traffic/branches/UserFiber:
include/UF.H include/UFConnectionPool.H include/UFIO.H include/UFServer.H
src/UF.C src/UFConnectionPoolImpl.C src/UFConnectionPoolImpl.H src/UFIO.C
src/UFServer.C
Author: akundu
Date: Sun May 16 02:10:55 2010
New Revision: 944754
URL: http://svn.apache.org/viewvc?rev=944754&view=rev
Log:
- Raghav's completed working version of UFConnectionPool
- UFServer now users ufCreateThreadWithIO (instead of creating its own accept fxn)
- cleanups
Modified:
trafficserver/traffic/branches/UserFiber/include/UF.H
trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
trafficserver/traffic/branches/UserFiber/include/UFIO.H
trafficserver/traffic/branches/UserFiber/include/UFServer.H
trafficserver/traffic/branches/UserFiber/src/UF.C
trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C
trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H
trafficserver/traffic/branches/UserFiber/src/UFIO.C
trafficserver/traffic/branches/UserFiber/src/UFServer.C
Modified: trafficserver/traffic/branches/UserFiber/include/UF.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UF.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UF.H Sun May 16 02:10:55 2010
@@ -157,7 +157,7 @@ struct UFScheduler
static ThreadUFSchedulerMap _threadUFSchedulerMap;
- static pthread_mutex_t _mutexToCheckFiberScheduerMap;
+ static pthread_mutex_t _mutexToCheckFiberSchedulerMap;
//returns the fiber scheduler on this thread or other threads;
static UFScheduler* getUFScheduler(pthread_t tid = 0);
@@ -202,6 +202,7 @@ protected:
//the sleep tree
MapTimeUF _sleepList;
+ unsigned long long int _earliestWakeUpFromSleep;
//store the shortest sleep interval
unsigned long long int _amtToSleep;
@@ -248,8 +249,12 @@ inline void UF::usleep(unsigned long lon
struct timeval now;
gettimeofday(&now, 0);
- unsigned long long int timeNow = now.tv_sec*1000000+now.tv_usec;
- _parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), this));
+ unsigned long long int timeToWakeUp = now.tv_sec*1000000+now.tv_usec + sleepAmtInUs;
+ if( _parentScheduler->_earliestWakeUpFromSleep > timeToWakeUp ||
+ !_parentScheduler->_earliestWakeUpFromSleep)
+ _parentScheduler->_earliestWakeUpFromSleep = timeToWakeUp;
+
+ _parentScheduler->_sleepList.insert(std::make_pair(timeToWakeUp, this));
block();
}
Modified: trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H Sun May 16 02:10:55 2010
@@ -38,6 +38,9 @@ public:
double getGroupAvailability(const std::string& name) const;
void clearBadConnections();
+ static int MAX_SIMUL_CONNS_PER_HOST;
+ static int TIMEOUT_PER_REQUEST;
+
//TODO - get stats & detailed stats info
protected:
@@ -48,14 +51,13 @@ protected:
struct UFConnectionPoolCleaner : public UF
{
void run();
- UFConnectionPoolCleaner (bool registerMe = false) : _conn_pool(NULL)
+ UFConnectionPoolCleaner (bool registerMe = false)
{
// if(registerMe)
// _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
}
UF* createUF() { return new UFConnectionPoolCleaner; }
- UFConnectionPool* _conn_pool;
};
namespace StringUtil
@@ -83,6 +85,7 @@ struct UFConnectionIpInfo
UFIOIntMap _currentlyAvailableConnections;
UFIOIntMap _currentlyUsedConnections;
unsigned int _inProcessCount;
+ UFMutex _someConnectionAvailable;
};
Modified: trafficserver/traffic/branches/UserFiber/include/UFIO.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFIO.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFIO.H Sun May 16 02:10:55 2010
@@ -187,15 +187,16 @@ struct EpollUFIOScheduler : public UFIOS
bool _interruptedByEventFd;
protected:
- UF* _uf;
- int _epollFd;
- unsigned int _maxFds;
- struct epoll_event* _epollEventStruct;
- IntUFIOMap _intUFIOMap;
- bool _alreadySetup;
+ UF* _uf;
+ int _epollFd;
+ unsigned int _maxFds;
+ struct epoll_event* _epollEventStruct;
+ IntUFIOMap _intUFIOMap;
+ bool _alreadySetup;
- MapTimeUFIO _sleepList;
+ MapTimeUFIO _sleepList;
+ unsigned long long int _earliestWakeUpFromSleep;
bool addToScheduler(UFIO* ufio,
void* inputInfo /*flags to identify how ot add*/,
Modified: trafficserver/traffic/branches/UserFiber/include/UFServer.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFServer.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFServer.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFServer.H Sun May 16 02:10:55 2010
@@ -50,7 +50,7 @@ public:
StringThreadMapping* getThreadList() { return &_threadList; };
std::vector<pthread_t>* getThreadType(const std::string& type);
- void addThread(const std::string& type, UFScheduler* ufScheduler);
+ void addThread(const std::string& type, UFScheduler* ufScheduler, pthread_t tid=0);
void run();
protected:
@@ -76,20 +76,24 @@ private:
//start processing
};
+
+
struct UFServerThreadChooser : public UFIOAcceptThreadChooser
{
UFServerThreadChooser() { }
std::pair<UFScheduler*, pthread_t> pickThread(int listeningFd);
- void add(UFScheduler* ufs, pthread_t tid)
- {
- _threadList.push_back(make_pair(ufs, tid));
- }
+ void add(UFScheduler* ufs, pthread_t tid);
protected:
std::vector<pair<UFScheduler*, pthread_t> > _threadList;
};
+inline void UFServerThreadChooser::add(UFScheduler* ufs, pthread_t tid)
+{
+ _threadList.push_back(make_pair(ufs, tid));
+}
+
inline std::pair<UFScheduler*, pthread_t> UFServerThreadChooser::pickThread(int listeningFd)
{
static unsigned int lastLocUsed = 0;
Modified: trafficserver/traffic/branches/UserFiber/src/UF.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UF.C?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UF.C Sun May 16 02:10:55 2010
@@ -96,7 +96,7 @@ bool UF::setup(void* stackPtr, size_t st
///////////////UFScheduler/////////////////////
ThreadUFSchedulerMap UFScheduler::_threadUFSchedulerMap;
-pthread_mutex_t UFScheduler::_mutexToCheckFiberScheduerMap = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t UFScheduler::_mutexToCheckFiberSchedulerMap = PTHREAD_MUTEX_INITIALIZER;
static pthread_key_t getThreadKey()
{
if(pthread_key_create(&UFScheduler::_specific_key, 0) != 0)
@@ -109,6 +109,7 @@ static pthread_key_t getThreadKey()
pthread_key_t UFScheduler::_specific_key = getThreadKey();
UFScheduler::UFScheduler()
{
+ _earliestWakeUpFromSleep = 0;
_exitJustMe = false;
_specific = 0;
_currentFiber = 0;
@@ -126,14 +127,14 @@ UFScheduler::UFScheduler()
{
pthread_t currThreadId = pthread_self();
- pthread_mutex_lock(&_mutexToCheckFiberScheduerMap);
+ pthread_mutex_lock(&_mutexToCheckFiberSchedulerMap);
if(_threadUFSchedulerMap.find(currThreadId) != _threadUFSchedulerMap.end())
{
cerr<<"cannot have more than one scheduler per thread"<<endl;
exit(1);
}
_threadUFSchedulerMap[currThreadId] = this;
- pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+ pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
}
else
{
@@ -157,7 +158,7 @@ UFScheduler::UFScheduler()
UFScheduler::~UFScheduler()
{
- pthread_key_delete(_specific_key);
+ //pthread_key_delete(_specific_key);
delete _conn_pool;
}
@@ -262,11 +263,6 @@ void UFScheduler::runScheduler()
gettimeofday(&start, 0);
unsigned long long int timeNow = 0;
- // Add connection pool cleanup fiber
- UFConnectionPoolCleaner *conn_pool_cleanup_fiber = new UFConnectionPoolCleaner;
- conn_pool_cleanup_fiber->_conn_pool = _conn_pool;
- addFiberToScheduler(conn_pool_cleanup_fiber);
-
while(!_exitJustMe && !_exit)
{
UFList::iterator beg = _activeRunningList.begin();
@@ -299,33 +295,6 @@ void UFScheduler::runScheduler()
//check the sleep queue
ranGetTimeOfDay = false;
_amtToSleep = DEFAULT_SLEEP_IN_USEC;
- //pick up the fibers that may have completed sleeping
- //look into the sleep list;
- if(!_sleepList.empty())
- {
- gettimeofday(&now, 0);
- ranGetTimeOfDay = true;
- timeNow = (now.tv_sec*1000000)+now.tv_usec;
- for(MapTimeUF::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
- {
- //TODO: has to be cleaned up
- //1. see if anyone has crossed the sleep timer - add them to the active list
- if(beg->first <= timeNow) //sleep time is over
- {
- _activeRunningList.push_front(beg->second); //putting to the front - so that it gets evaluated before anything else
- _sleepList.erase(beg);
- beg = _sleepList.begin();
- continue;
- }
- else
- {
- _amtToSleep = beg->first-timeNow;
- break;
- }
- ++beg;
- }
- }
-
//check if some other thread has nominated some user fiber to be
//added to this thread's list -
@@ -336,7 +305,7 @@ void UFScheduler::runScheduler()
_inThreadedMode)
{
- _amtToSleep = 0;
+ _amtToSleep = 0; //since we're adding new ufs to the list we dont need to sleep
//TODO: do atomic comparison to see if there is anything in
//_nominateToAddToActiveRunningList before getting the lock
pthread_mutex_lock(&_mutexToNominateToActiveList);
@@ -345,7 +314,7 @@ void UFScheduler::runScheduler()
{
UF* uf = *beg;
if(uf->_parentScheduler)
- _activeRunningList.push_front(uf);
+ _activeRunningList.push_back(uf);
else //adding a new fiber
addFiberToScheduler(uf, 0);
beg = _nominateToAddToActiveRunningList.erase(beg);
@@ -355,6 +324,38 @@ void UFScheduler::runScheduler()
}
+ //pick up the fibers that may have completed sleeping
+ //look into the sleep list;
+ if(!_sleepList.empty())
+ {
+ gettimeofday(&now, 0);
+ ranGetTimeOfDay = true;
+ timeNow = (now.tv_sec*1000000)+now.tv_usec;
+ if(timeNow >= _earliestWakeUpFromSleep) //dont go into this queue unless the time seen the last time has passed
+ {
+ for(MapTimeUF::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
+ {
+ //TODO: has to be cleaned up
+ //1. see if anyone has crossed the sleep timer - add them to the active list
+ if(beg->first <= timeNow) //sleep time is over
+ {
+ _activeRunningList.push_back(beg->second);
+ _sleepList.erase(beg);
+ beg = _sleepList.begin();
+ continue;
+ }
+ else
+ {
+ if(_amtToSleep) //since the nominate system might have turned off the sleep - we dont activate it again
+ _amtToSleep = beg->first-timeNow;
+ _earliestWakeUpFromSleep = beg->first;
+ break;
+ }
+ ++beg;
+ }
+ }
+ }
+
//see if there is anything to do or is it just sleeping time now
if(!_notifyFunc && _activeRunningList.empty() && !_exit)
{
@@ -390,14 +391,14 @@ UFScheduler* UFScheduler::getUFScheduler
if(!tid || tid == pthread_self())
return (UFScheduler*)pthread_getspecific(_specific_key);
- pthread_mutex_lock(&_mutexToCheckFiberScheduerMap);
+ pthread_mutex_lock(&_mutexToCheckFiberSchedulerMap);
ThreadUFSchedulerMap::const_iterator index = _threadUFSchedulerMap.find(tid);
if(index == _threadUFSchedulerMap.end())
{
- pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+ pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
return 0;
}
- pthread_mutex_unlock(&_mutexToCheckFiberScheduerMap);
+ pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
return const_cast<UFScheduler*>(index->second);
}
@@ -442,10 +443,7 @@ void* setupThread(void* args)
list<UF*>* ufsToStartWith = (list<UF*>*) args;
UFScheduler ufs;
- for(list<UF*>::iterator beg = ufsToStartWith->begin();
- beg != ufsToStartWith->end();
- ++beg)
- ufs.addFiberToScheduler(*beg);
+ ufs.addFibersToScheduler(*ufsToStartWith, 0);
delete ufsToStartWith;
//run the scheduler
Modified: trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C Sun May 16 02:10:55 2010
@@ -197,8 +197,6 @@ UFIO* UFConnectionPoolImpl::getConnectio
return getConnection(groupName, false);
}
-static int MAX_SIMUL_CONNS_PER_HOST = 5;
-static int TIMEOUT_PER_REQUEST = 10;
UFConnectionGroupInfo* UFConnectionPoolImpl::addGroupImplicit(const std::string& groupName)
{
UFConnectionGroupInfo* group = new UFConnectionGroupInfo(groupName);
@@ -210,8 +208,8 @@ UFConnectionGroupInfo* UFConnectionPoolI
UFConnectionIpInfo* ip = new UFConnectionIpInfo(groupName,
true,
- MAX_SIMUL_CONNS_PER_HOST,
- TIMEOUT_PER_REQUEST);
+ UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST,
+ UFConnectionPool::TIMEOUT_PER_REQUEST);
if(!ip)
{
cerr<<getpid()<<" "<<time(NULL)<<" couldnt create the ip obj"<<endl;
@@ -234,7 +232,7 @@ UFIO* UFConnectionPoolImpl::getConnectio
{
cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"null group or didnt find group with name "<<groupName<<endl;
groupInfo = addGroupImplicit(groupName);
- if(!groupInfo)
+ if(!groupInfo)
return NULL;
}
else
@@ -242,6 +240,10 @@ UFIO* UFConnectionPoolImpl::getConnectio
groupInfo = (*foundItr).second;
}
+ UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
+
+
UFIO* returnConn = NULL;
map<unsigned int,unsigned int> alreadySeenIPList; //this list will keep track of the ips that we've already seen
@@ -283,94 +285,67 @@ UFIO* UFConnectionPoolImpl::getConnectio
continue;
ipInfo->_timedOut = 0;
- //3. pick a connection from the currently available conns
- // (if there are any available)
- UFIOIntMap::iterator beg = ipInfo->_currentlyAvailableConnections.begin();
- for(; beg != ipInfo->_currentlyAvailableConnections.end();
- beg = ipInfo->_currentlyAvailableConnections.begin() // we're resetting to the beginning to avoid
- // the case of two threads ending up getting
- // the same connection
- )
- {
- returnConn = NULL;
- if(!((*beg).first))
- {
- ipInfo->_currentlyAvailableConnections.erase(beg);
- cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
- continue;
- }
- returnConn = (*beg).first;
- //take the found connection away from the curentlyAvaliableConnections list
- //since validConnection now actually checks to see the content thats within the channel to verify the validity of the connection
- //it may be that the thread gets switched out and some other thread comes into this section
- ipInfo->_currentlyAvailableConnections.erase(beg);
- ipInfo->_currentlyUsedConnections[returnConn] = time(NULL);
- break;
- }
- //4. if no connections are available then create a new connection if allowed
- if(!returnConn)
- {
- bool getConnection = false;
-
- if ((ipInfo->_maxSimultaneousConns < 0) ||
- (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount < (unsigned int) ipInfo->_maxSimultaneousConns)
- )
- getConnection = true;
- else if ( waitForConnection &&
- (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns)
+ while(1) {
+ if(ipInfo->_currentlyAvailableConnections.size()) {
+ //3. pick a connection from the currently available conns
+
+ UFIOIntMap::iterator beg = ipInfo->_currentlyAvailableConnections.begin();
+ for(; beg != ipInfo->_currentlyAvailableConnections.end();
+ beg = ipInfo->_currentlyAvailableConnections.begin() // we're resetting to the beginning to avoid
+ // the case of two threads ending up getting
+ // the same connection
)
- {
- //wait for the signal to get pinged
- //unsigned short int counter = 0;
- //while(counter < MAX_WAIT_FOR_CONNECTION_TO_BE_AVAILABLE) //we only try for 10 times
- while(true)
{
- if (ipInfo->_currentlyAvailableConnections.size() + ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount
- >= (unsigned int) ipInfo->_maxSimultaneousConns)
+ returnConn = NULL;
+ if(!((*beg).first))
{
- UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
- UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
- _someConnectionAvailable.lock(this_user_fiber);
- _someConnectionAvailable.condWait(this_user_fiber);
- _someConnectionAvailable.unlock(this_user_fiber);
- }
- else
- {
- getConnection = true; //this is here just so that if we ever go to the path of having a
- //max allowed waiting time - this var. will only get set if the
- //condition is met
- break;
+ ipInfo->_currentlyAvailableConnections.erase(beg);
+ cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
+ continue;
}
+ returnConn = (*beg).first;
+ //take the found connection away from the curentlyAvaliableConnections list
+ //since validConnection now actually checks to see the content thats within the channel to verify the validity of the connection
+ //it may be that the thread gets switched out and some other thread comes into this section
+ ipInfo->_currentlyAvailableConnections.erase(beg);
+ ipInfo->_currentlyUsedConnections[returnConn] = time(NULL);
+ return returnConn;
}
}
-
- if(getConnection)
- {
- ipInfo->_inProcessCount++;
- returnConn = createConnection(ipInfo);
- ipInfo->_inProcessCount--;
- if(returnConn)
- {
- time_t currTime = time(NULL);
- ipInfo->_currentlyUsedConnections[returnConn] = currTime;
- _UFConnectionIpInfoMap[returnConn] = make_pair(ipInfo, make_pair(true, currTime));
+ else {
+ // if _maxSimultaneousConns is hit, wait for a connection to become available
+ if(ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns) {
+ // wait for a connection to be released
+ ipInfo->_someConnectionAvailable.lock(this_user_fiber);
+ ipInfo->_someConnectionAvailable.condWait(this_user_fiber);
+ ipInfo->_someConnectionAvailable.unlock(this_user_fiber);
+ continue;
}
- else
- {
- if((random() % 100) < PERCENT_LOGGING_SAMPLING)
- cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt create a connection to "<<ipInfo->_ip<<" "<<strerror(errno)<<endl;
+ else {
+ // Create new connection
+ ipInfo->_inProcessCount++;
+ returnConn = createConnection(ipInfo);
+ ipInfo->_inProcessCount--;
+ if(returnConn)
+ {
+ time_t currTime = time(NULL);
+ ipInfo->_currentlyUsedConnections[returnConn] = currTime;
+ _UFConnectionIpInfoMap[returnConn] = make_pair(ipInfo, make_pair(true, currTime));
+ }
+ else
+ {
+ if((random() % 100) < PERCENT_LOGGING_SAMPLING)
+ cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"couldnt create a connection to "<<ipInfo->_ip<<" "<<strerror(errno)<<endl;
+ }
+
+ return returnConn;
}
}
}
-
- if(returnConn)
- break;
}
-
-
- //5. return the found connection
- return returnConn;
+
+ return NULL;
}
UFIO* UFConnectionPoolImpl::createConnection(UFConnectionIpInfo* ipInfo)
@@ -385,8 +360,12 @@ UFIO* UFConnectionPoolImpl::createConnec
return NULL;
}
- ufio->connect((struct sockaddr *) &ipInfo->_sin, sizeof(ipInfo->_sin), 16000);
+ int rc = ufio->connect((struct sockaddr *) &ipInfo->_sin, sizeof(ipInfo->_sin), 1600000);
+ if(rc != 0) {
+ delete ufio;
+ return NULL;
+ }
return ufio;
}
@@ -422,6 +401,7 @@ void UFConnectionPoolImpl::clearBadConne
void UFConnectionPoolImpl::releaseConnection(UFIO* ufIO, bool connOk)
{
+ cerr << "UFConnectionPoolImpl::releaseConnection" << endl;
if(!ufIO)
return;
@@ -471,16 +451,15 @@ void UFConnectionPoolImpl::releaseConnec
_UFConnectionIpInfoMap.erase(ufIOIpInfoLocItr);
}
- //signal to all the waiting threads that there might have been some change
+ //signal to all the first waiting threads that there might be a connection available
UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
UF* this_user_fiber = this_thread_scheduler->getRunningFiberOnThisThread();
- _someConnectionAvailable.lock(this_user_fiber);
- _someConnectionAvailable.broadcast();
- _someConnectionAvailable.unlock(this_user_fiber);
+ ipInfo->_someConnectionAvailable.lock(this_user_fiber);
+ ipInfo->_someConnectionAvailable.signal();
+ ipInfo->_someConnectionAvailable.unlock(this_user_fiber);
}
-
const unsigned int PRINT_BUFFER_LENGTH = 256*1024;
string UFConnectionPoolImpl::fillInfo(string& data, bool detailed) const
{
@@ -576,23 +555,21 @@ UFConnectionPoolImpl::~UFConnectionPoolI
void UFConnectionPoolCleaner::run()
{
- UF* uf = UFScheduler::getUF();
- UFScheduler* ufs = uf->getParentScheduler();
+ UFScheduler *this_thread_scheduler = UFScheduler::getUFScheduler();
+ UF* this_uf = this_thread_scheduler->getRunningFiberOnThisThread();
while(1)
{
- uf->usleep(300*1000*1000);
- if(!_conn_pool)
+ this_uf->usleep(300*1000*1000);
+ if(!this_thread_scheduler->_conn_pool)
break;
- _conn_pool->clearBadConnections();
+ this_thread_scheduler->_conn_pool->clearBadConnections();
}
- ufs->setExit();
+ this_thread_scheduler->setExit();
}
UFConnectionPoolImpl::UFConnectionPoolImpl()
-{
- // add fiber to monitor connections on thread
- // if (!thread_create(runThreadToMontiorBadConnections, this, 0, 4*1024))
- // cerr<<"couldnt create thread to monitor bad connections"<<endl;
+{
+
}
void UFConnectionPoolImpl::init()
@@ -605,6 +582,9 @@ void UFConnectionPoolImpl::init()
}
}
+int UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST = 5;
+int UFConnectionPool::TIMEOUT_PER_REQUEST = 10;
+
UFConnectionPool::UFConnectionPool()
{
impl = new UFConnectionPoolImpl();
Modified: trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H Sun May 16 02:10:55 2010
@@ -39,8 +39,6 @@ public:
protected:
GroupIPMap _groupIpMap;
UFConnectionIpInfoMap _UFConnectionIpInfoMap;
- UFMutex _someConnectionAvailable;
-
UFIO* createConnection(UFConnectionIpInfo* stIpInfo);
};
Modified: trafficserver/traffic/branches/UserFiber/src/UFIO.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFIO.C?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFIO.C Sun May 16 02:10:55 2010
@@ -1,4 +1,5 @@
#include "UFIO.H"
+#include "UFConnectionPool.H"
#include <netdb.h>
#include <sys/socket.h>
#include <sys/time.h>
@@ -17,7 +18,6 @@
#include <errno.h>
#include <string>
#include <string.h>
-
using namespace std;
static int makeSocketNonBlocking(int fd)
@@ -620,6 +620,7 @@ EpollUFIOScheduler::EpollUFIOScheduler(U
_epollFd = -1;
_epollEventStruct = 0;
_alreadySetup = false;
+ _earliestWakeUpFromSleep = 0;
}
EpollUFIOScheduler::~EpollUFIOScheduler()
@@ -698,7 +699,6 @@ bool EpollUFIOScheduler::addToScheduler(
{
struct timeval now;
gettimeofday(&now, 0);
- unsigned long long int timeNow = now.tv_sec*1000000+now.tv_usec;
UFSleepInfo* ufsi = getSleepInfo();
if(!ufsi)
{
@@ -707,7 +707,11 @@ bool EpollUFIOScheduler::addToScheduler(
}
ufsi->_ufio = ufio;
ufio->_sleepInfo = ufsi;
- _sleepList.insert(std::make_pair((timeNow+to), ufsi));
+ unsigned long long int timeToWakeUp = now.tv_sec*1000000+now.tv_usec + to;
+ if(_earliestWakeUpFromSleep > timeToWakeUp ||
+ !_earliestWakeUpFromSleep)
+ _earliestWakeUpFromSleep = timeToWakeUp;
+ _sleepList.insert(std::make_pair(timeToWakeUp, ufsi));
}
ufio->getUF()->block(); //switch context till someone wakes me up
@@ -897,13 +901,16 @@ void EpollUFIOScheduler::waitForEvents(T
int nfds;
struct timeval now;
+ unsigned long long int timeNow = 0;
IntUFIOMap::iterator index;
UFIO* ufio = 0;
UF* uf = 0;
unsigned long long int amtToSleep = timeToWait;
+ unsigned long long int ufsAmtToSleep = 0;
int i = 0;
_interruptedByEventFd = false;
UFScheduler* ufs = _uf->getParentScheduler();
+ list<UF*> ufsToAddToScheduler;
if(!ufs)
{
cerr<<"epoll scheduler has to be connected to some scheduler"<<endl;
@@ -917,13 +924,11 @@ void EpollUFIOScheduler::waitForEvents(T
_uf->yield();
}
- amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
- if(amtToSleep > ufs->getAmtToSleep())
- amtToSleep = ufs->getAmtToSleep();
- nfds = ::epoll_wait(_epollFd,
- _epollEventStruct,
- _maxFds,
- amtToSleep);
+ if(amtToSleep > (ufsAmtToSleep = ufs->getAmtToSleep()))
+ amtToSleep = (int)(ufsAmtToSleep/1000);
+ else
+ amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
+ nfds = ::epoll_wait(_epollFd, _epollEventStruct, _maxFds, amtToSleep);
if(nfds > 0)
{
//for each of the fds that had activity activate them
@@ -962,40 +967,46 @@ void EpollUFIOScheduler::waitForEvents(T
if(!_sleepList.empty())
{
gettimeofday(&now, 0);
- unsigned long long int timeNow = (now.tv_sec*1000000)+now.tv_usec;
- for( MapTimeUFIO::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
+ timeNow = (now.tv_sec*1000000)+now.tv_usec;
+ if(timeNow >= _earliestWakeUpFromSleep) //dont go into this queue unless the time seen the last time has passed
{
- //1. see if anyone has crossed the sleep timer - add them to the active list
- if(beg->first <= timeNow) //sleep time is over
+ ufsToAddToScheduler.clear();
+ for( MapTimeUFIO::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
{
- UFSleepInfo* ufsi = beg->second;
- if(ufsi)
+ //1. see if anyone has crossed the sleep timer - add them to the active list
+ if(beg->first <= timeNow) //sleep time is over
{
- UFIO* ufio = ufsi->_ufio;
- if(ufio &&
- ufio->_sleepInfo == ufsi && //make sure that the ufio is not listening on another sleep counter right now
- ufio->_uf->_status == BLOCKED) //make sure that the uf hasnt been unblocked already
+ UFSleepInfo* ufsi = beg->second;
+ if(ufsi)
{
- ufio->_sleepInfo = 0;
- ufio->_errno = ETIMEDOUT;
- ufio->_uf->getParentScheduler()->addFiberToScheduler(ufio->_uf, 0);
- //this is so that we dont have to wait to handle the conn. being woken up
- _interruptedByEventFd = true;
- }
-
- releaseSleepInfo(*ufsi);
+ UFIO* ufio = ufsi->_ufio;
+ if(ufio &&
+ ufio->_sleepInfo == ufsi && //make sure that the ufio is not listening on another sleep counter right now
+ ufio->_uf->_status == BLOCKED) //make sure that the uf hasnt been unblocked already
+ {
+ ufio->_sleepInfo = 0;
+ ufio->_errno = ETIMEDOUT;
+ ufsToAddToScheduler.push_back(ufio->_uf);
+ //this is so that we dont have to wait to handle the conn. being woken up
+ _interruptedByEventFd = true;
+ }
+
+ releaseSleepInfo(*ufsi);
+ }
+
+ _sleepList.erase(beg);
+ beg = _sleepList.begin();
+ continue;
}
-
- _sleepList.erase(beg);
- beg = _sleepList.begin();
- continue;
- }
- else
- {
- amtToSleep = (amtToSleep > beg->first-timeNow) ? beg->first-timeNow : amtToSleep;
- break;
+ else
+ {
+ amtToSleep = (amtToSleep > beg->first-timeNow) ? beg->first-timeNow : amtToSleep;
+ _earliestWakeUpFromSleep = beg->first;
+ break;
+ }
+ ++beg;
}
- ++beg;
+ ufs->addFibersToScheduler(ufsToAddToScheduler, 0);
}
}
@@ -1025,5 +1036,8 @@ void IORunner::run()
void UFIO::ufCreateThreadWithIO(pthread_t* tid, list<UF*>* ufsToStartWith)
{
ufsToStartWith->push_front(new IORunner());
+
+ // Add connection pool cleaner
+ //ufsToStartWith->push_back(new UFConnectionPoolCleaner); //TODO: figure out how to deal w/ inactive connections
UFScheduler::ufCreateThread(tid, ufsToStartWith);
}
Modified: trafficserver/traffic/branches/UserFiber/src/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFServer.C?rev=944754&r1=944753&r2=944754&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFServer.C Sun May 16 02:10:55 2010
@@ -99,42 +99,6 @@ struct AcceptRunner : public UF
int AcceptRunner::_myLoc = -1;
AcceptRunner* AcceptRunner::_self = new AcceptRunner(true);
-static void* acceptThreadStart(void* args)
-{
- if(!args)
- return 0;
-
- UFServer* ufserver = (UFServer*) args;
-
- UFScheduler ufs;
- //add the io scheduler
- ufs.addFiberToScheduler(new IORunner());
- //add the accept port fiber
- UF* uf = new AcceptRunner();
- if(!uf)
- return 0;
- uf->_startingArgs = args;
- ufs.addFiberToScheduler(uf);
-
- ufserver->addThread("ACCEPT", 0);
- ufs.runScheduler();
- return 0;
-}
-
-static void* ioThreadStart(void* args)
-{
- if(!args)
- return 0;
-
- UFScheduler ufs;
- //add the io scheduler
- ufs.addFiberToScheduler(new IORunner());
-
- ((UFServer*) args)->addThread("NETIO", &ufs);
- ufs.runScheduler();
- return 0;
-}
-
void UFServer::startThreads()
{
preThreadCreation();
@@ -142,40 +106,43 @@ void UFServer::startThreads()
MAX_THREADS_ALLOWED = (MAX_THREADS_ALLOWED ? MAX_THREADS_ALLOWED : 1);
MAX_ACCEPT_THREADS_ALLOWED = (MAX_ACCEPT_THREADS_ALLOWED ? MAX_ACCEPT_THREADS_ALLOWED : 1);
+ unsigned int i = 0;
pthread_t* thread = new pthread_t[MAX_THREADS_ALLOWED+MAX_ACCEPT_THREADS_ALLOWED];
- pthread_attr_t attr;
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
//start the IO threads
- unsigned int i = 0;
for(; i<MAX_THREADS_ALLOWED; i++)
{
- if(pthread_create(&(thread[i]), &attr, ioThreadStart, this) != 0)
+ UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>());
+ usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to cond. var later
+ //add the io threads to the thread chooser
+ UFScheduler* ufs = UFScheduler::getUFScheduler(thread[i]);
+ if(!ufs)
{
- cerr<<"couldnt create thread "<<strerror(errno)<<endl;
+ cerr<<"didnt get scheduler for tid = "<<thread[i]<<endl;
exit(1);
}
- usleep(500); //TODO: avoid the need for threadChooser to have a mutex
+ addThread("NETIO", ufs, thread[i]);
}
//start the stats thread
UFStatSystem::init(this);
- usleep(1000); //wait before starting the accept thread //TODO: change to cond signal later
-
preAccept();
-
//start the accept thread
for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
{
- if(pthread_create(&(thread[i]), &attr, acceptThreadStart, this) != 0)
- {
- cerr<<"couldnt create accept thread "<<strerror(errno)<<endl;
- exit(1);
- }
+ AcceptRunner* ar = new AcceptRunner();
+ ar->_startingArgs = this;
+ list<UF*>* ufsToAdd = new list<UF*>();
+ ufsToAdd->push_back(ar);
+ UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
+
+ usleep(5000); //TODO: let the thread finish initializing
+ addThread("ACCEPT", 0, thread[i]);
}
+ cerr<<"starting server"<<endl;
+
+
//wait for the threads to finish
void* status;
for(i=0; i<MAX_THREADS_ALLOWED+MAX_ACCEPT_THREADS_ALLOWED; i++)
@@ -260,9 +227,11 @@ void UFServer::run()
}
}
-void UFServer::addThread(const std::string& type, UFScheduler* ufScheduler)
+void UFServer::addThread(const std::string& type, UFScheduler* ufScheduler, pthread_t tid)
{
- pthread_t tid = pthread_self();
+ if(!tid)
+ tid = pthread_self();
+
StringThreadMapping::iterator index = _threadList.find(type);
if(index == _threadList.end())
{