You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/06/14 17:55:08 UTC
svn commit: r954525 [2/2] - in
/trafficserver/traffic/branches/UserFiber/core: include/ src/
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H Mon Jun 14 15:55:08 2010
@@ -6,41 +6,149 @@
#include "UF.H"
#include "UFConnectionPool.H"
-class UFIO;
+struct UFIO;
+struct UFConnIPInfo;
+struct UFConnGroupInfo;
+
+typedef std::map<std::string, UFConnGroupInfo*> GroupIPMap;
+typedef std::map<std::string, UFConnIPInfo*> IPInfoStore;
+typedef std::map<UFIO*, time_t> UFIOIntMap;
+typedef std::map<UFIO*, UFConnIPInfo*> UFConnIPInfoMap;
-
-typedef std::map<std::string,UFConnectionGroupInfo*> GroupIPMap;
-typedef std::map<UFIO*, std::pair<UFConnectionIpInfo*, std::pair<bool, time_t> > > UFConnectionIpInfoMap;
const time_t DEFAULT_TIMEOUT_OF_IP_ON_FAILURE = 10;
-class UFConnectionPoolImpl
+struct UFConnectionPoolImpl
{
-public:
UFConnectionPoolImpl();
~UFConnectionPoolImpl();
// Call before thread creation
static void init();
- bool addGroup(UFConnectionGroupInfo* stGroupInfo);
- UFConnectionGroupInfo* removeGroup(const std::string& name);
- UFIO* getConnection(const std::string& groupName);
- UFIO* getConnection(const std::string& groupName, bool waitForConnection);
+ bool addGroup(UFConnGroupInfo* stGroupInfo);
+ void removeGroup(const std::string& name);
+ UFIO* getConnection(const std::string& groupName, bool waitForConnection = true);
void releaseConnection(UFIO* ufIO, bool connOk = true);
- UFConnectionGroupInfo* addGroupImplicit(const std::string& groupName);
+ UFConnGroupInfo* addGroupImplicit(const std::string& groupName);
+
+ void clearUnusedConnections(TIME_IN_US lastUsedTimeDiff = 300000000 /*300 secs*/, unsigned long long int coverListTime = 60*1000*1000);
+
+ void setMaxSimulConnsPerHost(int input);
+ int getMaxSimulConnsPerHost();
+ void setTimeToTimeoutIPAfterFailure(time_t input);
+ time_t getTimeToTimeoutIPAfterFailure();
+
+protected:
+ GroupIPMap _groupIpMap;
+ IPInfoStore _ipInfoStore;
+ UFConnIPInfoMap _ufConnIPInfoMap;
+ int _maxSimulConnsPerHost;
+ time_t _timeToTimeoutIPAfterFailure; ///How long should we time out an IP if there is a failure
+
+ UFConnIPInfo* getIPInfo(const std::string& name);
+ bool createIPInfo(const std::string& groupName, UFConnGroupInfo* groupInfo);
+};
+inline void UFConnectionPoolImpl::setMaxSimulConnsPerHost(int input) { _maxSimulConnsPerHost = input; }
+inline int UFConnectionPoolImpl::getMaxSimulConnsPerHost() { return _maxSimulConnsPerHost; }
+inline void UFConnectionPoolImpl::setTimeToTimeoutIPAfterFailure(time_t input) { _timeToTimeoutIPAfterFailure = input; }
+inline time_t UFConnectionPoolImpl::getTimeToTimeoutIPAfterFailure() { return _timeToTimeoutIPAfterFailure; }
+
+struct UFConnIPInfo
+{
+ UFConnIPInfo(const std::string& ip,
+ unsigned int port,
+ bool persistent = true,
+ int maxSimultaneousConns = 0,
+ TIME_IN_US connectTimeout = 0,
+ TIME_IN_US timeToFailOutIPAfterFailureInSecs = 10);
+ ~UFConnIPInfo() {} //TODO: remove all the conns w/in currently available and currently used
+
+ const std::string& getIP() const;
+ struct sockaddr_in* getSin();
+ unsigned int getMaxSimultaneousConns() const;
+ unsigned int getInProcessCount() const;
+ bool getPersistent() const;
+ unsigned int getTimeToFailOutIPAfterFailureInSecs() const;
+ unsigned int getTimeToExpire() const;
+ void setTimeToExpire(time_t input);
+ TIME_IN_US getConnectTimeout() const;
+ time_t getTimedOut() const;
+ void setTimedOut(time_t t);
+ UFIOIntMap& getCurrentlyAvailableConnections();
+ UFMutex* getMutexToCheckSomeConnection();
+
+ time_t getLastUsed() const;
+ void incInProcessCount(int numToIncrement = 1);
+ UFIO* getConnection(UFConnIPInfoMap& _UFConnIPInfoMap);
+ UFIO* createConnection();
+
+ UFIOIntMap _currentlyAvailableConnections;
+ unsigned int _currentlyUsedCount;
+
+protected:
+ std::string _ip;
+ unsigned int _port;
+ struct sockaddr_in _sin;
+ unsigned int _maxSimultaneousConns;
+ bool _persistent;
+
+ unsigned int _timeToExpireDNSInfo;
+ unsigned int _timeToFailOutIPAfterFailureInSecs; ///how many s to try before considering the connect a failure
+ TIME_IN_US _connectTimeout; ///how many ms to try before considering the connect a failure
+ time_t _timedOut;
+
+ unsigned int _inProcessCount;
+ UFMutex _someConnectionAvailable;
+ time_t _lastUsed;
+
+};
+inline time_t UFConnIPInfo::getLastUsed() const { return _lastUsed; }
+inline struct sockaddr_in* UFConnIPInfo::getSin() { return &_sin; }
+inline const string& UFConnIPInfo::getIP() const { return _ip; }
+inline unsigned int UFConnIPInfo::getMaxSimultaneousConns() const { return _maxSimultaneousConns; }
+inline unsigned int UFConnIPInfo::getInProcessCount() const { return _inProcessCount; }
+inline void UFConnIPInfo::incInProcessCount(int numToIncrement) { _inProcessCount += numToIncrement; }
+inline bool UFConnIPInfo::getPersistent() const { return _persistent; }
+inline unsigned int UFConnIPInfo::getTimeToFailOutIPAfterFailureInSecs() const { return _timeToFailOutIPAfterFailureInSecs; }
+inline TIME_IN_US UFConnIPInfo::getConnectTimeout() const { return _connectTimeout; }
+inline unsigned int UFConnIPInfo::getTimeToExpire() const { return _timeToExpireDNSInfo; }
+inline void UFConnIPInfo::setTimeToExpire(time_t input) { _timeToExpireDNSInfo = input; }
+inline time_t UFConnIPInfo::getTimedOut() const { return _timedOut; }
+inline void UFConnIPInfo::setTimedOut(time_t t) { _timedOut = t; }
+inline UFMutex* UFConnIPInfo::getMutexToCheckSomeConnection() { return &_someConnectionAvailable; }
+inline UFIOIntMap& UFConnIPInfo::getCurrentlyAvailableConnections() { return _currentlyAvailableConnections; }
+
- std::string fillInfo(std::string& data, bool detailed = false) const;
- double getGroupAvailability(const std::string& name) const;
- void clearBadConnections();
+typedef std::vector<std::string> UFConnIPInfoList;
+struct UFConnGroupInfo
+{
+ UFConnGroupInfo(const std::string& name);
+ ~UFConnGroupInfo();
- static time_t _timeoutIP; ///How long should we time out an IP if there is a failure
+ UFConnIPInfoList& getIpInfoList();
+ time_t getTimeToExpireAt() const;
+ void setTimeToExpireAt(time_t input);
+ std::string getName() const;
protected:
- GroupIPMap _groupIpMap;
- UFConnectionIpInfoMap _UFConnectionIpInfoMap;
+ std::string _name;
+ UFConnIPInfoList _ipInfoList;
+ time_t _timeToExpireAt;
+};
+inline std::string UFConnGroupInfo::getName() const { return _name; }
+inline time_t UFConnGroupInfo::getTimeToExpireAt() const { return _timeToExpireAt; }
+inline void UFConnGroupInfo::setTimeToExpireAt(time_t input) { _timeToExpireAt = input; }
+inline UFConnIPInfoList& UFConnGroupInfo::getIpInfoList() { return _ipInfoList; }
+
- UFIO* createConnection(UFConnectionIpInfo* stIpInfo);
+struct UFConnectionPoolCleaner : public UF
+{
+ void run();
+ UFConnectionPoolCleaner (bool registerMe = false);
+ UF* createUF() { return new UFConnectionPoolCleaner(); }
};
+inline UFConnectionPoolCleaner::UFConnectionPoolCleaner (bool registerMe) { /*if(registerMe) _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);*/ }
+
#endif
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFIO.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFIO.C Mon Jun 14 15:55:08 2010
@@ -1,5 +1,7 @@
#include "UFIO.H"
#include "UFConnectionPool.H"
+#include "UFStatSystem.H"
+#include "UFStats.H"
#include <netdb.h>
#include <sys/socket.h>
#include <sys/time.h>
@@ -39,13 +41,12 @@ void UFIO::reset()
_lastEpollFlag = 0;
_sleepInfo = 0;
_markedActive = false;
+ _active = true;
}
UFIO::~UFIO()
{
close();
- if(_sleepInfo)
- _sleepInfo->_ufio = 0;
}
UFIO::UFIO(UF* uf, int fd)
@@ -275,7 +276,7 @@ void UFIO::accept(UFIOAcceptThreadChoose
result = ufiotChooser->pickThread(_fd);
ufs = result.first;
tToAddTo = result.second;
- if(!ufs || !ufs->addFibersToScheduler(listOfUFsToAdd, tToAddTo))
+ if(!ufs || !ufs->addFiberToScheduler(listOfUFsToAdd, tToAddTo))
{
cerr<<"couldnt find thread to assign "<<acceptFd<<" or couldnt add fiber to scheduler"<<endl;
exit(1);
@@ -293,7 +294,7 @@ void UFIO::accept(UFIOAcceptThreadChoose
result = ufiotChooser->pickThread(_fd);
ufs = result.first;
tToAddTo = result.second;
- if(!ufs || !ufs->addFibersToScheduler(listOfUFsToAdd, tToAddTo))
+ if(!ufs || !ufs->addFiberToScheduler(listOfUFsToAdd, tToAddTo))
{
cerr<<"couldnt find thread to assign "<<acceptFd<<" or couldnt add fiber to scheduler"<<endl;
exit(1);
@@ -333,8 +334,11 @@ ssize_t UFIO::read(void *buf, size_t nby
while(1)
{
n = ::read(_fd, buf, nbyte);
- if(n > 0)
+ if(n > 0)
+ {
+ UFStatSystem::increment(UFStats::bytesRead, n);
return n;
+ }
else if(n < 0)
{
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
@@ -360,6 +364,11 @@ ssize_t UFIO::read(void *buf, size_t nby
else if(n == 0)
break;
}
+
+ // Increment stat for bytes written
+ if(n > 0)
+ UFStatSystem::increment(UFStats::bytesRead, n);
+
return n;
}
@@ -376,7 +385,10 @@ ssize_t UFIO::write(const void *buf, siz
{
amtWritten += n;
if(amtWritten == nbyte)
+ {
+ UFStatSystem::increment(UFStats::bytesWritten, n);
return amtWritten;
+ }
else
continue;
}
@@ -404,15 +416,23 @@ ssize_t UFIO::write(const void *buf, siz
else if(n == 0)
break;
}
+
+ // Increment stat for bytes written
+ if(n > 0)
+ UFStatSystem::increment(UFStats::bytesWritten, n);
+
return n;
}
-int UFIO::connect(const struct sockaddr *addr,
+bool UFIO::connect(const struct sockaddr *addr,
int addrlen,
TIME_IN_US timeout)
{
if(!isSetup()) //create the socket and make the socket non-blocking
- return -1;
+ {
+ _errno = EINVAL;
+ return false;
+ }
//find the scheduler for this request
@@ -420,22 +440,24 @@ int UFIO::connect(const struct sockaddr
while(::connect(_fd, addr, addrlen) < 0)
{
- _errno = errno;
if(errno == EINTR)
continue;
else if(errno == EINPROGRESS || errno == EAGAIN)
{
if(!tmpUfios->setupForConnect(this, timeout))
{
- cerr<<"couldnt setup for connect - "<<strerror(errno)<<endl;
- return -1;
+ _errno = errno;
+ return false;
}
}
else
- return -1;
+ {
+ _errno = errno;
+ return false;
+ }
}
- return 0;
+ return true;
}
int UFIO::sendto(const char *buf, int len, const struct sockaddr *to, int tolen, TIME_IN_US timeout)
@@ -994,8 +1016,10 @@ void EpollUFIOScheduler::waitForEvents(T
exit(1);
}
ufio->_markedActive = true;
- //activate the fiber
- ufs->addFiberToScheduler(uf, 0);
+ if(ufio->_active)
+ //activate the fiber
+ ufs->addFiberToScheduler(uf, 0);
+ //else must be the case that no one is watching this ufio - such as in the case where the conn. pool holding onto the conn.
}
else
{
@@ -1057,7 +1081,7 @@ void EpollUFIOScheduler::waitForEvents(T
}
++beg;
}
- ufs->addFibersToScheduler(ufsToAddToScheduler, 0);
+ ufs->addFiberToScheduler(ufsToAddToScheduler, 0);
}
}
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFPC.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFPC.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFPC.C Mon Jun 14 15:55:08 2010
@@ -3,10 +3,11 @@
using namespace std;
-bool UFProducer::produceData(void* data, size_t size, bool freeDataOnExit, UF* uf)
+bool UFProducer::produceData(void* data, unsigned int size, int ufpcCode, bool freeDataOnExit, UF* uf)
{
if(!uf)
uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+ _uf = uf;
//create the UFProducerData structure
UFProducerData* ufpd = UFProducerData::getObj();
@@ -14,50 +15,75 @@ bool UFProducer::produceData(void* data,
ufpd->_size = size;
ufpd->_freeDataOnExit = freeDataOnExit;
ufpd->_producerWhichInserted = this;
- ufpd->_ufpcCode = (data && size) ? ADD : END;
+ ufpd->_ufpcCode = ufpcCode;
+ ufpd->_lockToUpdate = _requireLockToUpdateConsumers;
+ //increase the reference count
+ ufpd->addRef(_producersConsumerSetSize);
- /*TODO: add the structure to the queue
- _producerDataMutex.lock(uf);
- _producerData.push_back(ufpd);
- _producerDataMutex.unlock(uf);
- */
+ //the optimized case of no locking + there being only one consumer
+ if(!_requireLockToUpdateConsumers &&
+ _producersConsumerSetSize == 1 &&
+ _mostRecentConsumerAdded)
+ {
+ _mostRecentConsumerAdded->_queueOfDataToConsume.push_back(ufpd);
+ UF* consUF = _mostRecentConsumerAdded->getUF();
+ if(consUF)
+ UFScheduler::getUFScheduler()->addFiberToScheduler(consUF);
+ return true;
+ }
//for each of the consumers add it to their queue
- _producersConsumerSetLock.getSpinLock();
- //increase the reference count
- ufpd->addRef(_producersConsumerSet.size());
+ if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
for(set<UFConsumer*>::iterator beg = _producersConsumerSet.begin();
beg != _producersConsumerSet.end(); ++beg)
{
//add to the consumer's queue
- (*beg)->_queueOfDataToConsumeLock.lock(uf);
- (*beg)->_queueOfDataToConsume.push_back(ufpd);
- (*beg)->_queueOfDataToConsumeLock.signal();
- (*beg)->_queueOfDataToConsumeLock.unlock(uf);
+ if(_requireLockToUpdateConsumers)
+ {
+ (*beg)->_queueOfDataToConsumeLock.lock(uf);
+ (*beg)->_queueOfDataToConsume.push_back(ufpd);
+ if(!(*beg)->getNotifyOnExitOnly() || (ufpcCode == 0))
+ (*beg)->_queueOfDataToConsumeLock.signal();
+ (*beg)->_queueOfDataToConsumeLock.unlock(uf);
+ }
+ else
+ {
+ (*beg)->_queueOfDataToConsume.push_back(ufpd);
+ UF* consUF = (*beg)->getUF();
+ if(consUF && (!(*beg)->getNotifyOnExitOnly() || (ufpcCode == 0)))
+ UFScheduler::getUFScheduler()->addFiberToScheduler(consUF);
+ }
}
- //printf("%lu added to %lu\n", (unsigned long int) ((uintptr_t)(void*)uf), (unsigned long int)_producersConsumerSet.size());
- _producersConsumerSetLock.releaseSpinLock();
+ if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
return true;
}
-UFConsumer::UFConsumer(bool shouldLockForInternalMods) : _shouldLockForInternalMods(shouldLockForInternalMods) { }
+UFConsumer::UFConsumer(bool notifyOnExitOnly) : _notifyOnExitOnly(notifyOnExitOnly)
+{
+ _currUF = 0;
+ _requireLockToWaitForUpdate = true;
+}
UFProducerData* UFConsumer::waitForData(UF* uf)
{
if(!uf)
uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
- _queueOfDataToConsumeLock.lock(uf);
+ _currUF = uf;
+
+ if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.lock(uf);
while(_queueOfDataToConsume.empty())
- _queueOfDataToConsumeLock.condWait(uf); //TODO: change to condTimedWait
+ {
+ if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.condWait(uf); //TODO: change to condTimedWait
+ else uf->block(); //wait for the producer to wake up the consumer
+ }
//read the first element from the queue
UFProducerData* result = _queueOfDataToConsume.front();
_queueOfDataToConsume.pop_front();
- //printf("%lu consumer read 1 out %lu\n", (unsigned long int) ((uintptr_t)(void*)uf), (unsigned long int)_queueOfDataToConsume.size());
- _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
+ if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
return result;
}
@@ -68,20 +94,18 @@ bool UFConsumer::hasData(UF* uf)
uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
bool result;
- _queueOfDataToConsumeLock.lock(uf);
+ if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.lock(uf);
result = !(_queueOfDataToConsume.empty());
- _queueOfDataToConsumeLock.unlock(uf);
+ if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
return result;
}
bool UFConsumer::joinProducer(UFProducer* ufp)
{
- if(_shouldLockForInternalMods)
- _consumersProducerSetLock.getSpinLock();
+ if(_requireLockToWaitForUpdate) _consumersProducerSetLock.getSpinLock();
_consumersProducerSet.insert(ufp);
- if(_shouldLockForInternalMods)
- _consumersProducerSetLock.releaseSpinLock();
+ if(_requireLockToWaitForUpdate) _consumersProducerSetLock.releaseSpinLock();
//notify the producer that we're adding this consumer
if(!ufp->addConsumer(this))
@@ -96,21 +120,20 @@ bool UFConsumer::removeProducer(UFProduc
if(!ufp->removeConsumer(this))
return false;
- if(_shouldLockForInternalMods)
- _consumersProducerSetLock.getSpinLock();
+ if(_requireLockToWaitForUpdate) _consumersProducerSetLock.getSpinLock();
_consumersProducerSet.erase(ufp);
- if(_shouldLockForInternalMods)
- _consumersProducerSetLock.releaseSpinLock();
+ if(_requireLockToWaitForUpdate) _consumersProducerSetLock.releaseSpinLock();
return true;
}
-UFConsumer::~UFConsumer()
+void UFConsumer::reset()
{
//1. notify all the producers on exit
- for(set<UFProducer*>::iterator beg = _consumersProducerSet.begin();
- beg != _consumersProducerSet.end();
- ++beg)
+ for(set<UFProducer*>::iterator beg = _consumersProducerSet.begin(); beg != _consumersProducerSet.end(); )
+ {
removeProducer(*beg);
+ beg = _consumersProducerSet.begin();
+ }
//2. clear out all the remaining entries in the queue
if(!_queueOfDataToConsume.empty())
@@ -119,25 +142,28 @@ UFConsumer::~UFConsumer()
for(; beg != _queueOfDataToConsume.end(); ++beg)
UFProducerData::releaseObj((*beg));
}
-}
-
-UFProducer::UFProducer()
-{
}
-UFProducer::~UFProducer()
+void UFProducer::reset()
{
- produceData(0, 0, 0); //notify the consumers that the producer is bailing
+ //add the EOF indicator
+ produceData(0, 0, 0/*exit*/, 0/*freeDataOnExit*/); //notify the consumers that the producer is bailing
//have to wait for all the consumers to acknowledge my death
- while(1)
+ UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+ if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
+ _acceptNewConsumers = false;
+ while(!_producersConsumerSet.empty())
{
- //TODO: change to signal based later
- _producersConsumerSetLock.getSpinLock();
- if(!_producersConsumerSet.size())
- break;
- _producersConsumerSetLock.releaseSpinLock();
- usleep(100000);
+ if(_requireLockToUpdateConsumers) _producersConsumerSetLock.condTimedWait(uf, 1000000);
+ else
+ {
+ _uf = uf;
+ _uf->block();
+ }
}
+ if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
}
+stack<UFProducerData*> UFProducerData::_objList;
+UFMutex UFProducerData::_objListMutex;
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFServer.C?rev=954525&r1=954524&r2=954525&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFServer.C Mon Jun 14 15:55:08 2010
@@ -7,12 +7,27 @@
#include <sys/types.h>
#include <sys/wait.h>
#include "UFStatSystem.H"
+#include "UFStats.H"
using namespace std;
//TODO: handle signals later
//TODO: create monitoring port later
//
+//
+static string getPrintableTime()
+{
+ char asctimeDate[32];
+ asctimeDate[0] = '\0';
+ time_t now = time(0);
+ asctime_r(localtime(&now), asctimeDate);
+
+ string response = asctimeDate;
+ size_t loc = response.find('\n');
+ if(loc != string::npos)
+ response.replace(loc, 1, "");
+ return response;
+}
void UFServer::reset()
{
@@ -43,6 +58,8 @@ struct NewConnUF : public UF
UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
+ // increment connections handled stat
+ UFStatSystem::increment(UFStats::connectionsHandled);
//clear the client connection
delete fiberStartingArgs->ufio;
@@ -77,14 +94,15 @@ struct AcceptRunner : public UF
fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), ufserver->getPort() /*, deal w/ backlog*/);
if(fd < 0)
{
- cerr<<"couldnt setup listen socket"<<endl;
+ cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup listen socket"<<endl;
exit(1);
}
if(!ufio || !ufio->setFd(fd, false/*has already been made non-blocking*/))
{
- cerr<<"couldnt setup accept thread"<<endl;
+ cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup accept thread"<<endl;
return;
}
+
ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, ufserver, 0, 0);
}
AcceptRunner(bool registerMe = false)
@@ -112,12 +130,13 @@ void UFServer::startThreads()
for(; i<MAX_THREADS_ALLOWED; i++)
{
UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>());
+ cerr<<getPrintableTime()<<" "<<getpid()<<": created thread (with I/O) - "<<thread[i]<<endl;
usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to cond. var later
//add the io threads to the thread chooser
UFScheduler* ufs = UFScheduler::getUFScheduler(thread[i]);
if(!ufs)
{
- cerr<<"didnt get scheduler for tid = "<<thread[i]<<endl;
+ cerr<<getPrintableTime()<<" "<<getpid()<<": didnt get scheduler for tid - "<<thread[i]<<endl;
exit(1);
}
addThread("NETIO", ufs, thread[i]);
@@ -126,6 +145,9 @@ void UFServer::startThreads()
//start the stats thread
UFStatSystem::init(this);
+ // Register server stats
+ UFStats::registerStats();
+
preAccept();
//start the accept thread
for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
@@ -135,13 +157,12 @@ void UFServer::startThreads()
list<UF*>* ufsToAdd = new list<UF*>();
ufsToAdd->push_back(ar);
UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
-
usleep(5000); //TODO: let the thread finish initializing
addThread("ACCEPT", 0, thread[i]);
+ cerr<<getPrintableTime()<<" "<<getpid()<<": created accept thread (with I/O) - "<<thread[i]<<endl;
}
-
//wait for the threads to finish
void* status;
for(i=0; i<MAX_THREADS_ALLOWED+MAX_ACCEPT_THREADS_ALLOWED; i++)
@@ -161,11 +182,11 @@ void UFServer::run()
_listenFd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), _port); //TODO:set the backlog
if(_listenFd < 0)
{
- cerr<<"couldnt setup listen socket"<<endl;
+ cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl;
exit(1);
}
- if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes
+ if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes (or to only run in threaded mode)
{
preThreadRun();
startThreads();
@@ -181,7 +202,7 @@ void UFServer::run()
unsigned int pid = fork();
if(pid < 0)
{
- cerr<<"("<<getpid()<<")(P): couldnt create child# : "<<strerror(errno)<<endl;
+ cerr<<getPrintableTime()<<" "<<getpid()<<": (P): couldnt create child# : "<<strerror(errno)<<endl;
exit(1);
}
if(!pid) //child listens to conns
@@ -195,6 +216,7 @@ void UFServer::run()
startThreads();
exit(0);
}
+ cerr<<getPrintableTime()<<" "<<getpid()<<": (P): started child process: "<<pid<<endl;
_childProcesses[pid] = time(0);
postBetweenFork(pid);
}
@@ -206,19 +228,18 @@ void UFServer::run()
else if(child_pid < 0)
{
if(errno != ECHILD)
- cerr<<"("<<getpid()<<")(P): waitpid error: "<<strerror(errno)<<endl;
+ cerr<<getPrintableTime()<<" "<<getpid()<<": (P): waitpid error:"<<endl;
}
else if(child_pid > 0)
{
- cerr<<"("<<getpid()<<")(P): child_pid "<<child_pid<<" died "<<endl;
+ cerr<<getPrintableTime()<<" "<<getpid()<<")(P): child_pid "<<child_pid<<" died "<<endl;
map<int, time_t>::iterator itr = _childProcesses.find(child_pid);
if(itr != _childProcesses.end())
_childProcesses.erase(itr);
- //parentChildDeathHandler(child_pid); TODO
}
//we've been asked to bail
- if(MAX_PROCESSES_ALLOWED && !_childProcesses.size())
+ if(!MAX_PROCESSES_ALLOWED && !_childProcesses.size())
break;
//let the parent rest
@@ -241,7 +262,7 @@ void UFServer::addThread(const std::stri
}
index->second->push_back(tid);
- if(ufScheduler)
+ if(ufScheduler && type == "NETIO")
_threadChooser->add(ufScheduler, tid);
}
Added: trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStats.C?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStats.C (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStats.C Mon Jun 14 15:55:08 2010
@@ -0,0 +1,22 @@
+#include "UFStats.H"
+#include "UFStatSystem.H"
+
+uint32_t UFStats::connectionsHandled;
+uint32_t UFStats::txnSuccess;
+uint32_t UFStats::txnFail;
+uint32_t UFStats::txnReject;
+uint32_t UFStats::bytesRead;
+uint32_t UFStats::bytesWritten;
+
+namespace UFStats
+{
+ void registerStats(bool lock_needed)
+ {
+ UFStatSystem::registerStat("connections.handled", &connectionsHandled, lock_needed);
+ UFStatSystem::registerStat("txn.success", &txnSuccess, lock_needed);
+ UFStatSystem::registerStat("txn.fail", &txnFail, lock_needed);
+ UFStatSystem::registerStat("txn.reject", &txnReject, lock_needed);
+ UFStatSystem::registerStat("bytes.read", &bytesRead, lock_needed);
+ UFStatSystem::registerStat("bytes.written", &bytesWritten, lock_needed);
+ }
+}
Added: trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H Mon Jun 14 15:55:08 2010
@@ -0,0 +1,31 @@
+/** @file
+
+ A brief file description
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#ifndef _UFSWAPCONTEXT_H_
+#define _UFSWAPCONTEXT_H_
+
+extern "C" {
+ int uf_swapcontext(ucontext *old_ctx, ucontext *new_ctx);
+}
+
+#endif
Added: trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S?rev=954525&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S Mon Jun 14 15:55:08 2010
@@ -0,0 +1,59 @@
+#if __WORDSIZE == 64
+.text
+ .align 4
+.globl uf_swapcontext
+ .type uf_swapcontext,@function
+
+uf_swapcontext:
+ // store
+ movq %r15, 0x60(%rdi)
+ movq %r14, 0x58(%rdi)
+ movq %r13, 0x50(%rdi)
+ movq %r12, 0x48(%rdi)
+ movq %r9, 0x30(%rdi)
+ movq %r8, 0x28(%rdi)
+ movq %rdi, 0x68(%rdi)
+ movq %rsi, 0x70(%rdi)
+ movq %rbp, 0x78(%rdi)
+ movq %rbx, 0x80(%rdi)
+ movq %rdx, 0x88(%rdi)
+ movq %rcx, 0x98(%rdi)
+
+ movq (%rsp), %rcx
+ movq %rcx, 0xa8(%rdi)
+ leaq 0x8(%rsp), %rcx
+ movq %rcx, 0xa0(%rdi)
+
+ leaq 0x1a8(%rdi), %rcx
+ movq %rcx, 0xe0(%rdi)
+
+ fnstenv (%rcx)
+ stmxcsr 0x1c0(%rdi)
+
+ // restore
+ movq %rsi, %rdi
+ movq 0xe0(%rdi), %rcx
+ fldenv (%rcx)
+ ldmxcsr 0x1c0(%rdi)
+
+ movq 0x60(%rdi), %r15
+ movq 0x58(%rdi), %r14
+ movq 0x50(%rdi), %r13
+ movq 0x48(%rdi), %r12
+ movq 0x30(%rdi), %r9
+ movq 0x28(%rdi), %r8
+ movq 0x70(%rdi), %rsi
+ movq 0x78(%rdi), %rbp
+ movq 0x80(%rdi), %rbx
+ movq 0x88(%rdi), %rdx
+ movq 0x98(%rdi), %rcx
+
+ movq 0xa0(%rdi), %rsp
+ movq 0xa8(%rdi), %rcx
+ pushq %rcx
+
+ movq 0x68(%rdi), %rdi
+
+ xorl %eax, %eax
+ ret
+#endif
Propchange: trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.S
------------------------------------------------------------------------------
svn:executable = *