You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/30 04:07:06 UTC
svn commit: r949458 [1/2] - in /trafficserver/traffic/branches/UserFiber: ./
core/ core/ext/ core/include/ core/lib/ core/src/ include/ protocol/
protocol/http/ protocol/http/include/ protocol/http/lib/ protocol/http/src/
samples/ src/
Author: akundu
Date: Sun May 30 02:07:05 2010
New Revision: 949458
URL: http://svn.apache.org/viewvc?rev=949458&view=rev
Log:
Build changes
- setup the core and protocol directories
- setup src + include under core
- setup http under protocol
Code changes
- complete support for Producer and Consumer modules that make tunnel creations trivial (UFPC)
- Vijaya Mamidi's support for incorporating asynchronous dns resolving using the c-ares library
- Raghav J.'s support for condTimedWait
- lock system changes
- upgrade sleep system to avoid having to go into the sleep list unless previous sleep iterations indicate the time may have expired
- support for rpoll (will get replaced by a generic poll call later)
Test cases
- testSignal.C - user lock + signal test case
- testSleep.C - sleep test
- testProducer.C - producer and consumer test
Sample Code
- httpProxy.C - a simple proxy server
- echoServer.C - a simple echo server
Added:
trafficserver/traffic/branches/UserFiber/core/
trafficserver/traffic/branches/UserFiber/core/ext/
trafficserver/traffic/branches/UserFiber/core/include/
trafficserver/traffic/branches/UserFiber/core/include/UF.H
- copied, changed from r944753, trafficserver/traffic/branches/UserFiber/include/UF.H
trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H (props changed)
- copied unchanged from r944753, trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H
trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
- copied, changed from r944753, trafficserver/traffic/branches/UserFiber/include/UFIO.H
trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
trafficserver/traffic/branches/UserFiber/core/include/UFServer.H
- copied unchanged from r944753, trafficserver/traffic/branches/UserFiber/include/UFServer.H
trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H
- copied, changed from r944753, trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H
trafficserver/traffic/branches/UserFiber/core/lib/
trafficserver/traffic/branches/UserFiber/core/src/ (props changed)
- copied from r944753, trafficserver/traffic/branches/UserFiber/src/
trafficserver/traffic/branches/UserFiber/core/src/UF.C
- copied, changed from r944754, trafficserver/traffic/branches/UserFiber/src/UF.C
trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
- copied, changed from r944754, trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C
trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
- copied unchanged from r944754, trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H
trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
- copied, changed from r944754, trafficserver/traffic/branches/UserFiber/src/UFIO.C
trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
- copied, changed from r944754, trafficserver/traffic/branches/UserFiber/src/UFServer.C
trafficserver/traffic/branches/UserFiber/protocol/ (with props)
trafficserver/traffic/branches/UserFiber/protocol/http/
trafficserver/traffic/branches/UserFiber/protocol/http/include/
trafficserver/traffic/branches/UserFiber/protocol/http/lib/
trafficserver/traffic/branches/UserFiber/protocol/http/src/
trafficserver/traffic/branches/UserFiber/protocol/http/ufHTTPServer.C
trafficserver/traffic/branches/UserFiber/samples/UFCondTimedWaitTest.C
trafficserver/traffic/branches/UserFiber/samples/testProducer.C
trafficserver/traffic/branches/UserFiber/samples/testSignal.C
trafficserver/traffic/branches/UserFiber/samples/testSleep.C
Removed:
trafficserver/traffic/branches/UserFiber/include/
trafficserver/traffic/branches/UserFiber/src/
trafficserver/traffic/branches/UserFiber/ufHTTPServer.C
Modified:
trafficserver/traffic/branches/UserFiber/Makefile
trafficserver/traffic/branches/UserFiber/core/src/Makefile
trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
trafficserver/traffic/branches/UserFiber/samples/Makefile
trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
trafficserver/traffic/branches/UserFiber/samples/echoServer.C
Modified: trafficserver/traffic/branches/UserFiber/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/Makefile?rev=949458&r1=949457&r2=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/Makefile Sun May 30 02:07:05 2010
@@ -1,18 +1,14 @@
-CXXFLAGS=-g -O3 -Wall -Wno-deprecated -march=x86-64
+CXXFLAGS=-g -O3 -Wall -Werror -Wno-deprecated -march=x86-64
+#CXXFLAGS=-g -Wall -Werror -Wno-deprecated -march=x86-64 -fno-inline
CPPFLAGS=-DPIPE_NOT_EFD -I./include
-LDFLAGS=-L./lib -lUF -lpthread
+LDFLAGS=-L./core/lib -lUF -lpthread
-.PHONY: all clean lib
+.PHONY: all clean core/lib
-all: ufHTTPServer
+all: core/lib
-lib:
- $(MAKE) -C src all
-
-ufHTTPServer: lib ufHTTPServer.C
- $(CXX) -o $@ $(CPPFLAGS) $(CXXFLAGS) ufHTTPServer.C $(LDFLAGS)
+core/lib:
+ $(MAKE) -C core/src all
clean:
- $(MAKE) -C src clean
- $(RM) *.o ufHTTPServer
-
+ $(MAKE) -C core/src clean
Copied: trafficserver/traffic/branches/UserFiber/core/include/UF.H (from r944753, trafficserver/traffic/branches/UserFiber/include/UF.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UF.H?p2=trafficserver/traffic/branches/UserFiber/core/include/UF.H&p1=trafficserver/traffic/branches/UserFiber/include/UF.H&r1=944753&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UF.H Sun May 30 02:07:05 2010
@@ -12,6 +12,7 @@
#include <list>
#include <ucontext.h>
#include <pthread.h>
+#include <errno.h>
using namespace std;
namespace std { using namespace __gnu_cxx; }
@@ -28,6 +29,7 @@ enum UFStatus
//create the type of UF you'd like to pass into the accept handler
typedef unsigned long long int UFId;
struct UFScheduler;
+struct UFMutex;
struct UF
{
friend class UFScheduler;
@@ -52,6 +54,7 @@ struct UF
static void gusleep(unsigned long long int sleepAmtInUs);
///simply block the fiber
void block();
+ UFStatus getStatus() const;
UFStatus _status;
@@ -67,6 +70,7 @@ protected:
private:
void waitOnLock();
};
+inline UFStatus UF::getStatus() const { return _status; }
struct UFFactory
{
@@ -82,25 +86,20 @@ protected:
size_t _capacity;
size_t _size;
};
-inline UFFactory* UFFactory::getInstance()
-{
- return (_instance ? _instance : (_instance = new UFFactory()));
-}
-inline UF* UFFactory::selectUF(unsigned int location)
-{
- return _objMapping[location];
-}
-
-
+inline UFFactory* UFFactory::getInstance() { return (_instance ? _instance : (_instance = new UFFactory())); }
+inline UF* UFFactory::selectUF(unsigned int location) { return _objMapping[location]; }
+struct UFWaitInfo;
+typedef std::list<UFWaitInfo*> UFWaitList;
+typedef map<UF*, UFWaitInfo*> UFWLHash;
typedef std::list<UF*> UFList;
struct UFMutex
{
UFMutex()
{
_lockCurrentlyOwned = false;
- _pendingLockNotification = 0;
_lockActive = 0;
+ _mustRunUF = 0;
}
bool lock(UF* uf);
@@ -116,26 +115,38 @@ struct UFMutex
//THE CALLER MUST get the lock before calling this fxn
//THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
void signal();
+ //THE CALLER MUST get the lock before calling this fxn
+ //THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
+ int condTimedWait(UF *uf, unsigned long long int sleepAmtInUs);
+ void releaseSpinLock(bool spinCPU = false);
+ void getSpinLock(bool spinCPU = false);
protected:
- int _lockActive;
- UFList _listOfClientsWaitingOnLock;
- UFList _listOfClientsWaitingOnCond;
- bool _lockCurrentlyOwned;
- UF* _pendingLockNotification;
-
- void releaseLocalLock();
- void getLocalLock();
+ int _lockActive;
+ UFList _listOfClientsWaitingOnLock;
+ //UFWaitList _listOfClientsWaitingOnCond;
+ UFWLHash _listOfClientsWaitingOnCond;
+ bool _lockCurrentlyOwned;
+ UF* _mustRunUF;
};
+struct UFWaitInfo
+{
+ UFWaitInfo() { reset(); }
+ void reset();
+ UF* _uf;
+ bool _sleeping;
+ bool _waiting;
+ UFMutex _ctrl;
+};
+inline void UFWaitInfo::reset() { _uf = 0; _sleeping = false; _waiting = false; }
-//per thread scheduler
-typedef std::multimap<unsigned long long int, UF*> MapTimeUF;
+typedef std::multimap<unsigned long long int, UFWaitInfo*> MapTimeUF;
//typedef std::map<pthread_t,UFScheduler*> ThreadUFSchedulerMap;
+//per thread scheduler
typedef hash_map<pthread_t, UFScheduler*, hash<uintptr_t> > ThreadUFSchedulerMap;
-struct UFConnectionPool;
struct UFScheduler
{
friend class UF;
@@ -157,7 +168,7 @@ struct UFScheduler
static ThreadUFSchedulerMap _threadUFSchedulerMap;
- static pthread_mutex_t _mutexToCheckFiberScheduerMap;
+ static pthread_mutex_t _mutexToCheckFiberSchedulerMap;
//returns the fiber scheduler on this thread or other threads;
static UFScheduler* getUFScheduler(pthread_t tid = 0);
@@ -173,7 +184,14 @@ struct UFScheduler
void* getSpecific() const;
unsigned long long int getAmtToSleep() const;
static void setExit(bool exit = true);
+ bool shouldExit() const;
void setExitJustMe(bool exit = true);
+ size_t getActiveRunningListSize() const;
+
+ //stats for thread
+ std::vector<long long> _stats;
+ UFMutex _stats_lock;
+
///the variable that says whether the scheduler should be handling the sleep or
//if its handled w/in the UserFabrics
@@ -194,14 +212,16 @@ protected:
//no lock for active running list - cause only the running
//thread can add to it
UFList _activeRunningList;
+ size_t _activeRunningListSize;
//nominate to add to a thread's running list
UFList _nominateToAddToActiveRunningList;
pthread_mutex_t _mutexToNominateToActiveList;
pthread_cond_t _condToNominateToActiveList;
-
+
//the sleep tree
MapTimeUF _sleepList;
+ unsigned long long int _earliestWakeUpFromSleep;
//store the shortest sleep interval
unsigned long long int _amtToSleep;
@@ -211,16 +231,17 @@ protected:
pthread_t _tid;
void notifyUF();
+
+ list<UFWaitInfo*> _availableWaitInfo;
+ UFWaitInfo* getWaitInfo();
+ void releaseWaitInfo(UFWaitInfo& ufsi);
public:
- //stats for thread
- std::vector<long long> _stats;
- UFMutex _stats_lock;
-
- // connection pool
- UFConnectionPool *_conn_pool;
+ UFMutex testingCondTimedWait;
};
+inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningListSize; }
+inline bool UFScheduler::shouldExit() const { return (_exitJustMe || _exit) ? true : false; }
inline unsigned long long int UFScheduler::getAmtToSleep() const { return _amtToSleep; }
inline UF* UFScheduler::getRunningFiberOnThisThread(){ return _currentFiber; }
inline const ucontext_t& UFScheduler::getMainContext() const { return _mainContext; };
@@ -229,6 +250,24 @@ inline void* UFScheduler::getSpecific()
inline void UFScheduler::setExit(bool exit) { _exit = exit; }
inline void UFScheduler::setExitJustMe(bool exit) { _exitJustMe = exit; }
+inline UFWaitInfo* UFScheduler::getWaitInfo()
+{
+ if(!_availableWaitInfo.empty())
+ {
+ UFWaitInfo* ufwi = _availableWaitInfo.front();
+ _availableWaitInfo.pop_front();
+ ufwi->reset();
+ return ufwi;
+ }
+
+ return new UFWaitInfo();
+}
+
+inline void UFScheduler::releaseWaitInfo(UFWaitInfo& ufwi)
+{
+ _availableWaitInfo.push_back(&ufwi);
+}
+
inline UFScheduler* UF::getParentScheduler() const { return _parentScheduler; }
inline void UF::waitOnLock() { block(); }
@@ -238,6 +277,11 @@ inline void UF::gusleep(unsigned long lo
UFScheduler::getUFScheduler()->getRunningFiberOnThisThread()->usleep(sleepAmtInUs);
}
+inline unsigned long long int timeInUS(timeval& t)
+{
+ return ((unsigned long long int)(((unsigned long long int) t.tv_sec)*1000000)+(unsigned long long int) t.tv_usec);
+}
+
inline void UF::usleep(unsigned long long int sleepAmtInUs)
{
if(!sleepAmtInUs)
@@ -248,8 +292,17 @@ inline void UF::usleep(unsigned long lon
struct timeval now;
gettimeofday(&now, 0);
- unsigned long long int timeNow = now.tv_sec*1000000+now.tv_usec;
- _parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), this));
+
+ unsigned long long int timeToWakeUp = timeInUS(now) + sleepAmtInUs;
+ if( _parentScheduler->_earliestWakeUpFromSleep > timeToWakeUp ||
+ !_parentScheduler->_earliestWakeUpFromSleep)
+ _parentScheduler->_earliestWakeUpFromSleep = timeToWakeUp;
+
+ UFWaitInfo *ufwi = _parentScheduler->getWaitInfo();
+ ufwi->_uf = this;
+ ufwi->_sleeping = true;
+
+ _parentScheduler->_sleepList.insert(std::make_pair(timeToWakeUp, ufwi));
block();
}
@@ -265,179 +318,14 @@ inline void UF::yield()
swapcontext(&_UFContext, &(_parentScheduler->getMainContext()));
}
-inline void UFMutex::releaseLocalLock()
-{
- while(!__sync_bool_compare_and_swap(&_lockActive, 1, 0)) { sched_yield(); }
- //while(!__sync_bool_compare_and_swap(&_lockActive, 1, 0)) { }
-}
-
-inline void UFMutex::getLocalLock()
-{
- while(!__sync_bool_compare_and_swap(&_lockActive, 0, 1)) { sched_yield(); }
- //while(!__sync_bool_compare_and_swap(&_lockActive, 0, 1)) { }
-}
-
-inline bool UFMutex::lock(UF* uf)
-{
- if(!uf || !uf->_parentScheduler)
- return false;
-
- getLocalLock();
- if(_listOfClientsWaitingOnLock.empty()) //probably the most common case (no UF has the lock)
- {
- _listOfClientsWaitingOnLock.push_back(uf);
- _lockCurrentlyOwned = true;
- releaseLocalLock();
- return true;
- }
-
- //see if any UF is holding the lock right now - if not get the lock
- //this is the case where between the time that an UF is woken up
- //(after another UF releases the lock)
- //and it actually runs this requesting UF might be able to procure the lock
- if(!_lockCurrentlyOwned)
- {
- _listOfClientsWaitingOnLock.push_front(uf);
- _lockCurrentlyOwned = true;
- releaseLocalLock();
- return true;
- }
-
- //for the rest of the UFs that didnt meet the above criteria
- //and didnt get the lock they have to wait
- _listOfClientsWaitingOnLock.push_back(uf);
- releaseLocalLock();
-
- while(1) //try to get the lock
- {
- //simply yield - since the uf will be woken up once it gets the lock
- uf->waitOnLock();
-
- //since this uf got woken up - check if it can get the lock now
- getLocalLock();
-
- if(_pendingLockNotification != uf) //there can only be one pending notification out at any time - since this lock was woken up - this must have gotten the pending notification
- {
- releaseLocalLock();
- continue;
- }
- _pendingLockNotification = 0;
-
- //check if any other UF has gotten the lock between the time that this UF
- //got the notification and actually acted on it
- if(!_lockCurrentlyOwned && (_listOfClientsWaitingOnLock.front() == uf))
- {
- _lockCurrentlyOwned = true;
- releaseLocalLock();
- return true;
- }
-
- releaseLocalLock();
- }
-
- return true;
-}
-
-inline bool UFMutex::unlock(UF* uf)
+inline void UFMutex::releaseSpinLock(bool spinCPU)
{
- if(!uf)
- return false;
-
- UFList::iterator beg;
- getLocalLock();
-
- beg = _listOfClientsWaitingOnLock.begin();
- if(uf == *beg) //check if this uf is the current owner of this lock
- {
- _lockCurrentlyOwned = false;
- beg = _listOfClientsWaitingOnLock.erase(beg);
-
- //notify the next UF in line
- if(!_listOfClientsWaitingOnLock.empty() && !_pendingLockNotification)
- {
- _pendingLockNotification = (*beg);
- releaseLocalLock();
- (*beg)->_parentScheduler->addFiberToScheduler((*beg), (*beg)->_parentScheduler->_tid);
- }
- else
- releaseLocalLock();
-
- return true;
- }
-
- releaseLocalLock();
- return false;
-}
-
-inline bool UFMutex::tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS)
-{
- while(1)
- {
- getLocalLock();
- if(_listOfClientsWaitingOnLock.empty())
- {
- _listOfClientsWaitingOnLock.push_back(uf);
- _lockCurrentlyOwned = true;
- releaseLocalLock();
- return true;
- }
-
- releaseLocalLock();
-
- if(!autoRetryIntervalInUS)
- break;
-
- usleep(autoRetryIntervalInUS);
- }
-
- return false;
-}
-
-
-
-inline bool UFMutex::condWait(UF* uf)
-{
- if(!uf)
- return false;
- _listOfClientsWaitingOnCond.push_back(uf) ;
- unlock(uf);
- uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal or broadcast has occurred
- lock(uf);
-
- return true;
-}
-
-inline void UFMutex::broadcast()
-{
- if(_listOfClientsWaitingOnCond.empty())
- return;
-
- //notify all the waiters to wake up
- UFList::iterator beg = _listOfClientsWaitingOnCond.begin();
- for(; beg != _listOfClientsWaitingOnCond.end(); )
- {
- (*beg)->_parentScheduler->addFiberToScheduler(*beg, (*beg)->_parentScheduler->_tid);
- beg = _listOfClientsWaitingOnCond.erase(beg);
- }
-}
-
-inline void UFMutex::signal()
-{
- if(_listOfClientsWaitingOnCond.empty())
- return;
-
- UFList::iterator beg = _listOfClientsWaitingOnCond.begin();
- (*beg)->_parentScheduler->addFiberToScheduler(*beg, (*beg)->_parentScheduler->_tid);
- _listOfClientsWaitingOnCond.erase(beg);
+ while(!__sync_bool_compare_and_swap(&_lockActive, 1, 0)) { if(!spinCPU) sched_yield(); }
}
-//TODO: later
-/*
-inline bool UFMutex::condTimedWait(UF* uf)
+inline void UFMutex::getSpinLock(bool spinCPU)
{
- if(!uf)
- return false;
+ while(!__sync_bool_compare_and_swap(&_lockActive, 0, 1)) { if(!spinCPU) sched_yield(); }
}
-*/
#endif
Added: trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFAres.H?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFAres.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFAres.H Sun May 30 02:07:05 2010
@@ -0,0 +1,121 @@
+#ifndef UF_ARES_H
+#define UF_ARES_H
+
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/epoll.h>
+#include <ares.h>
+#include "UFDNS.H"
+#include "UFHostEnt.H"
+
+// for ares less then version 1.7.0
+//#ifndef ares_addrttl
+//#define ares_addrttl addrttl
+//#define ares_addr6ttl addr6ttl
+//#endif
+
+typedef void (*my_sock_state_cb)(void *data, int s, int read, int write) ;
+
+
+typedef hash_map< const char *, UFHostEnt * , hash<const char*> > HostentHashMap;
+
+class UFAresUFIO : public UFIO
+{
+ public:
+ UFAresUFIO() : myhostent_((UFHostEnt*)0x00000BAD)
+ {
+ _uf = UFScheduler::getUF();
+ };
+
+ ~UFAresUFIO()
+ {
+ cerr<<"aresio destructor called"<<endl;
+ };
+
+ void set_myhostent(UFHostEnt *h)
+ {
+ myhostent_ = h;
+ };
+
+ void Init(my_sock_state_cb callback, UFAresUFIO *aresio)
+ {
+ op_.sock_state_cb =callback ;
+ op_.sock_state_cb_data = aresio;
+ ares_init_options(&ch_, &op_,ARES_OPT_SOCK_STATE_CB);
+ };
+
+ UFHostEnt *get_myhostent()
+ {
+ return myhostent_;
+ };
+
+ ares_channel get_channel()
+ {
+ return ch_;
+ };
+
+ void set_complete(bool flag)
+ {
+ complete_ = flag;
+ }
+
+ bool get_complete()
+ {
+ return complete_;
+ }
+
+ void destroy()
+ {
+ ares_destroy_options( &op_);
+ ares_destroy(ch_);
+ }
+
+ private:
+ ares_channel ch_;
+ struct ares_options op_;
+// struct hostent *hostent_;
+// struct ares_addrttl *aresttl_;;
+// struct nttl_;
+ UFHostEnt *myhostent_;
+ bool complete_;
+
+};
+
+
+
+class UFAres : public UFDNS
+{
+ public:
+ UFAres() { };
+ ~UFAres() { };
+ unsigned long int GetHostByName(const char *name, uint32_t timeout = 0);
+ hostent *GetHostByNameDebug(const char *name, uint32_t timeout = 0);
+ hostent *GetHostByAddr(uint32_t ip , uint32_t timeout = 0, uint32_t family = AF_INET) { return NULL; };
+ hostent *GetSrvByName(const char *name, uint32_t timeout = 0) { return NULL; };
+ UFAresUFIO* GetNewAresUFIO();
+ void ReleaseAresUFIO( UFAresUFIO* ares);
+ UFHostEnt* GetCachedHostent(const char* name)
+ {
+ //cerr<<"size in get"<<host_map_.size()<<endl;
+ HostentHashMap::const_iterator index = host_map_.find(name);
+ if(index == host_map_.end())
+ return NULL;
+ else
+ return const_cast<UFHostEnt*>(index->second);
+ }
+ void InsertToCache(const char* name,UFHostEnt *host)
+ {
+ //cerr<<"size in inset"<<host_map_.size()<<endl;
+ host_map_[name] = host;
+ //cerr<<" after size in inset"<<host_map_.size()<<endl;
+ }
+
+
+ private:
+ list<UFAresUFIO*> list_ares_ufio_;
+ HostentHashMap host_map_;
+};
+
+#endif
Propchange: trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H
------------------------------------------------------------------------------
svn:mergeinfo =
Added: trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H Sun May 30 02:07:05 2010
@@ -0,0 +1,35 @@
+#ifndef UF_DNS_H
+#define UF_DNS_H
+
+#include "UF.H"
+#include "UFIO.H"
+
+class UFDNS : public UFIO
+{
+ public:
+ UFDNS()
+ {
+ _uf = UFScheduler::getUF();
+ timeout_msec_ = 5000;
+ tries_ = 4;
+ port_ = 53;
+ tcp_=false;
+
+ }
+ virtual ~UFDNS() { };
+
+ virtual struct hostent *GetHostByNameDebug(const char *name, uint32_t timeout = 0) = 0 ;
+ virtual unsigned long int GetHostByName(const char *name, uint32_t timeout = 0) = 0;
+ virtual struct hostent *GetHostByAddr(uint32_t ip , uint32_t timeout = 0, uint32_t family = AF_INET) = 0 ;
+ virtual struct hostent *GetSrvByName(const char *name, uint32_t = 0) = 0 ;
+
+ protected:
+ uint32_t timeout_msec_;
+ uint32_t tries_;
+ uint32_t port_;
+ bool tcp_;
+
+
+};
+
+#endif
Added: trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H Sun May 30 02:07:05 2010
@@ -0,0 +1,113 @@
+#ifndef MY_HOSTENT
+#define MY_HOSTENT
+
+#include<ares.h>
+
+// for ares less then version 1.7.0
+//#ifndef ares_addrttl
+//#define ares_addrttl addrttl
+//#define ares_addr6ttl addr6ttl
+//#endif
+
+class UFHostEnt
+{
+ public :
+ UFHostEnt() { aresttl_ = NULL; hostent_ = NULL ; nttl_ = 0 ; timestamp_ = 0; };
+
+ ~UFHostEnt () { ReleaseEntries(); };
+
+ hostent* get_hostent()
+ {
+ return hostent_;
+ }
+
+ void set_hostent(hostent* host)
+ {
+ hostent_ = host;
+ }
+
+ ares_addrttl* get_aresttl()
+ {
+ return aresttl_;
+ }
+
+ void set_aresttl(ares_addrttl* aresttl)
+ {
+ aresttl_ = aresttl;
+ }
+
+ uint32_t get_nttl()
+ {
+ return nttl_;
+ }
+
+ void set_nttl(uint32_t nttl)
+ {
+ nttl_ = nttl;
+ }
+
+ double get_timestamp()
+ {
+ return timestamp_;
+ }
+
+ void set_timestamp(double timestamp)
+ {
+ timestamp_ = timestamp;
+ }
+
+ bool IsEntryExpired( double current_timestamp, uint32_t ttl)
+ {
+ if(((timestamp_ + ttl) - current_timestamp) <= 0)
+ return true ;
+
+ return false;
+ }
+
+ unsigned long int GetUnExpiredEntry(double current_timestamp)
+ {
+ for(unsigned int i = 0 ; i < nttl_ ; i ++)
+ {
+ if(IsEntryExpired(current_timestamp,aresttl_[i].ttl) == false)
+ //if(IsEntryExpired(current_timestamp,1) == false)
+ return aresttl_[i].ipaddr.s_addr;
+ }
+ return 0;
+ }
+
+ unsigned long int GetFirstIP()
+ {
+ if(aresttl_ !=NULL)
+ return aresttl_[0].ipaddr.s_addr;
+ else
+ return 0;
+ }
+ void ReleaseEntries()
+ {
+ if(hostent_ != NULL)
+ ares_free_hostent(hostent_);
+ if(aresttl_ != NULL)
+ free(aresttl_);
+ }
+
+ void lock(UF * uf)
+ {
+ mutex_.lock(uf);
+ }
+
+ void unlock(UF *uf)
+ {
+ mutex_.unlock(uf);
+ }
+
+
+ private:
+
+ UFMutex mutex_;
+ struct hostent *hostent_;
+ struct ares_addrttl *aresttl_;
+ uint32_t nttl_;
+ double timestamp_;
+};
+
+#endif
Copied: trafficserver/traffic/branches/UserFiber/core/include/UFIO.H (from r944753, trafficserver/traffic/branches/UserFiber/include/UFIO.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFIO.H?p2=trafficserver/traffic/branches/UserFiber/core/include/UFIO.H&p1=trafficserver/traffic/branches/UserFiber/include/UFIO.H&r1=944753&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFIO.H Sun May 30 02:07:05 2010
@@ -99,22 +99,20 @@ struct UFIO
bool close();
bool setFd(int fd, bool makeNonBlocking = true);
-
+ void setUF(UF* uf);
+ void setUFIOScheduler(UFIOScheduler* ufios);
unsigned int getErrno() const;
int getFd() const;
UF* getUF() const;
-
const std::string& getRemoteIP() const;
unsigned int getRemotePort() const;
-
UFIOScheduler* getUFIOScheduler() const;
- void setUFIOScheduler(UFIOScheduler* ufios);
-
- UFSleepInfo* _sleepInfo;
static void ufCreateThreadWithIO(pthread_t* tid, std::list<UF*>* ufsToStartWith);
- static UFIOScheduler* getUFIOS();
+
+ UFSleepInfo* _sleepInfo;
+ bool _markedActive;
protected:
int _fd;
@@ -129,11 +127,11 @@ protected:
unsigned int _remotePort;
int _lastEpollFlag;
- bool _amtReadLastTimeEqualToAskedAmt;
};
inline unsigned int UFIO::getErrno() const { return _errno; }
inline int UFIO::getFd() const { return _fd; }
inline UF* UFIO::getUF() const { return _uf; }
+inline void UFIO::setUF(UF* uf) { _uf = uf; }
inline UFIOScheduler* UFIO::getUFIOScheduler() const { return _ufios; }
inline void UFIO::setUFIOScheduler(UFIOScheduler* ufios) { _ufios = ufios; }
inline const std::string& UFIO::getRemoteIP() const { return _remoteIP; };
@@ -144,25 +142,38 @@ inline unsigned int UFIO::getRemotePort(
struct UFIOScheduler;
//typedef map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
typedef hash_map<pthread_t, UFIOScheduler*, hash<uintptr_t> > ThreadFiberIOSchedulerMap;
+
+struct UFConnectionPool;
struct UFIOScheduler
{
- UFIOScheduler() {}
- virtual ~UFIOScheduler() {}
+ UFIOScheduler();
+ virtual ~UFIOScheduler();
virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0) = 0;
virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0) = 0;
virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = 0) = 0;
virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0) = 0;
virtual bool closeConnection(UFIO* ufio) = 0;
+ //TODO: support regular poll behavior
+ virtual bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0) = 0;
virtual bool isSetup() { return false; }
virtual void waitForEvents(TIME_IN_US timeToWait) = 0;
- static ThreadFiberIOSchedulerMap _tfiosscheduler;
+ static UFIOScheduler* getUFIOS(pthread_t tid = 0);
+ static ThreadFiberIOSchedulerMap _tfiosscheduler;
+ static pthread_key_t _keyToIdentifySchedulerOnThread;
+
+ // connection pool
+ UFConnectionPool* getConnPool() const;
+
+protected:
+ UFConnectionPool* _connPool;
};
+inline UFConnectionPool* UFIOScheduler::getConnPool() const { return _connPool; }
-#define MAX_FDS_FOR_EPOLL 64*1024-1
+#define MAX_FDS_FOR_EPOLL 128*1024-1
//typedef map<int, UFIO*> IntUFIOMap;
typedef hash_map<int, UFIO*, hash<int> > IntUFIOMap;
typedef std::multimap<TIME_IN_US, UFSleepInfo*> MapTimeUFIO;
@@ -180,6 +191,7 @@ struct EpollUFIOScheduler : public UFIOS
bool setupForRead(UFIO* ufio, TIME_IN_US to = 0);
bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0);
bool closeConnection(UFIO* ufio);
+ bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0);
void waitForEvents(TIME_IN_US timeToWait = -1);
@@ -187,19 +199,23 @@ struct EpollUFIOScheduler : public UFIOS
bool _interruptedByEventFd;
protected:
- UF* _uf;
- int _epollFd;
- unsigned int _maxFds;
- struct epoll_event* _epollEventStruct;
- IntUFIOMap _intUFIOMap;
- bool _alreadySetup;
+ UF* _uf;
+ UFScheduler* _ufs;
+ int _epollFd;
+ unsigned int _maxFds;
+ struct epoll_event* _epollEventStruct;
+ IntUFIOMap _intUFIOMap;
+ bool _alreadySetup;
- MapTimeUFIO _sleepList;
+ MapTimeUFIO _sleepList;
+ unsigned long long int _earliestWakeUpFromSleep;
bool addToScheduler(UFIO* ufio,
void* inputInfo /*flags to identify how ot add*/,
- TIME_IN_US to = 0);
+ TIME_IN_US to = 0,
+ bool wait = true,
+ bool runEpollCtl = false);
list<UFSleepInfo*> _availableSleepInfo;
Added: trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFPC.H?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFPC.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFPC.H Sun May 30 02:07:05 2010
@@ -0,0 +1,132 @@
+#ifndef UFPC_H
+#define UFPC_H
+
+#include <set>
+#include "UF.H"
+
+struct UFMutex;
+struct UFProducerData;
+struct UFConsumer;
+struct UFProducer;
+
+enum UFProducerDataCode
+{
+ ADD = 1,
+ END = 2
+};
+
+struct UFConsumer
+{
+ UFConsumer(bool shouldLockForInternalMods = true);
+ virtual ~UFConsumer();
+ //user needs to call input->releaseMe after consuming the data
+ UFProducerData* waitForData(UF* uf = 0);
+ bool hasData(UF* uf = 0);
+
+ bool joinProducer(UFProducer* ufp);
+ bool removeProducer(UFProducer* ufp);
+
+ std::list<UFProducerData*> _queueOfDataToConsume;
+ UFMutex _queueOfDataToConsumeLock;
+
+protected:
+ std::set<UFProducer*> _consumersProducerSet;
+ UFMutex _consumersProducerSetLock;
+ bool _shouldLockForInternalMods;
+};
+
+struct UFProducer
+{
+ friend class UFConsumer;
+ UFProducer();
+ ~UFProducer();
+ bool removeConsumer(UFConsumer* ufc);
+ bool produceData(void* data, size_t size, bool freeDataOnExit = true, UF* uf = 0);
+
+protected:
+ std::set<UFConsumer*> _producersConsumerSet;
+ UFMutex _producersConsumerSetLock; //needed when the consumers are adding or removing themselves from the consumerList
+
+ /*TODO: producer can keep a collection of the data
+ std::list<UFProducerData*> _producerData;
+ UFMutex _producerDataMutex; //this mutex is needed to add to the producerdata
+ */
+ bool addConsumer(UFConsumer* ufc);
+};
+
+struct UFProducerData
+{
+ void* _data;
+ size_t _size;
+ UFProducer* _producerWhichInserted;
+ UFProducerDataCode _ufpcCode;
+ bool _freeDataOnExit;
+
+ void addRef(size_t numToAdd = 1);
+ void reduceRef();
+
+ static UFProducerData* getObj();
+ static void releaseObj(UFProducerData* obj);
+
+
+protected:
+ ~UFProducerData();
+ UFProducerData() { _referenceCount = 0; _freeDataOnExit = true; }
+ UFMutex _controlReferenceCount; //control the ref. count of this data
+ size_t _referenceCount;
+};
+
+inline bool UFProducer::addConsumer(UFConsumer* ufc)
+{
+ _producersConsumerSetLock.getSpinLock();
+ _producersConsumerSet.insert(ufc); //check insertion
+ _producersConsumerSetLock.releaseSpinLock();
+ return true;
+}
+
+inline bool UFProducer::removeConsumer(UFConsumer* ufc)
+{
+ _producersConsumerSetLock.getSpinLock();
+ _producersConsumerSet.erase(ufc);
+ _producersConsumerSetLock.releaseSpinLock();
+ return true;
+}
+
+inline void UFProducerData::addRef(size_t numToAdd)
+{
+ _controlReferenceCount.getSpinLock();
+ _referenceCount = numToAdd;
+ _controlReferenceCount.releaseSpinLock();
+}
+
+inline void UFProducerData::reduceRef()
+{
+ _controlReferenceCount.getSpinLock();
+ --_referenceCount;
+ _controlReferenceCount.releaseSpinLock();
+}
+
+inline UFProducerData* UFProducerData::getObj()
+{
+ return new UFProducerData();
+}
+
+inline void UFProducerData::releaseObj(UFProducerData* obj)
+{
+ if(!obj)
+ return;
+
+ obj->_controlReferenceCount.getSpinLock();
+ if(!--obj->_referenceCount)
+ delete obj;
+ else
+ obj->_controlReferenceCount.releaseSpinLock();
+}
+
+inline UFProducerData::~UFProducerData()
+{
+ if(_freeDataOnExit && _data)
+ free (_data);
+}
+
+#endif
Copied: trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H (from r944753, trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H?p2=trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H&p1=trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H&r1=944753&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H Sun May 30 02:07:05 2010
@@ -13,6 +13,13 @@
#include <sstream>
#include "UF.H"
+struct Stat
+{
+ Stat(std::string _name, long long _value);
+ std::string name;
+ long long value;
+};
+
class UFIO;
class UFServer;
class UFStatSystem
@@ -45,7 +52,7 @@ private:
static bool getStatNum(const char *stat_name, uint32_t &stat_num);
static UFServer *server;
static std::map<std::string, uint32_t> stat_name_to_num;
- static std::vector< std::pair<std::string, long long> > global_stats;
+ static std::vector< Stat > global_stats;
static uint32_t MAX_STATS_ALLOWED;
static uint32_t NUM_STATS_ESTIMATE;
Propchange: trafficserver/traffic/branches/UserFiber/core/src/
------------------------------------------------------------------------------
svn:mergeinfo =
Modified: trafficserver/traffic/branches/UserFiber/core/src/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/Makefile?rev=949458&r1=944753&r2=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/Makefile Sun May 30 02:07:05 2010
@@ -1,5 +1,16 @@
-CXXFLAGS=-g -O3 -Wall -Wno-deprecated -march=x86-64
-CPPFLAGS=-DPIPE_NOT_EFD -I../include
+ARES_VERSION = 1.7.1
+ARES = c-ares-$(ARES_VERSION)
+ARESDIR=../ext
+ARES_SRC = $(ARESDIR)/$(ARES)/ares.h
+ARES_SRC_FILE = $(ARES).tar.gz
+ARES_GPG = $(ARES).tar.gz.asc
+ARES_LIB = $(ARESDIR)/$(ARES)/.libs/libcares.a
+
+
+CXXFLAGS=-g -O3 -Wall -Werror -Wno-deprecated -march=x86-64
+#CXXFLAGS=-g -Wall -Werror -Wno-deprecated -march=x86-64 -fno-inline
+#CPPFLAGS=-DLOCK_DEBUG -DPIPE_NOT_EFD -I../include -I$(ARESDIR)/$(ARES)
+CPPFLAGS=-DPIPE_NOT_EFD -I../include -I$(ARESDIR)/$(ARES)
ARFLAGS=-rv
RANLIB=ranlib
INCLUDE_DIR=../include
@@ -7,28 +18,47 @@ LIB_DIR=../lib
.PHONY: all clean
-all: libUF.a
+all: $(LIB_DIR)/libUF.a
+
+$(ARES_LIB): $(ARES_SRC)
+ pushd $(ARESDIR)/$(ARES) && ./configure && gmake && popd
-UF.o: UF.C $(INCLUDE_DIR)/UF.H
+$(ARES_SRC):
+ if [ ! -d $(ARESDIR)/$(ARES) ] ; then \
+ pushd $(ARESDIR) && \
+ wget http://c-ares.haxx.se/$(ARES_SRC_FILE) && \
+ tar xzf ./$(ARES_SRC_FILE) && popd; \
+ fi
+
+$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H
$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UF.o UF.C
-UFIO.o: UFIO.C $(INCLUDE_DIR)/UFIO.H
+$(LIB_DIR)/UFPC.o: UFPC.C $(INCLUDE_DIR)/UFPC.H $(LIB_DIR)/UF.o
+ $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFPC.o UFPC.C
+
+$(LIB_DIR)/UFConnectionPoolImpl.o: UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
+ $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConnectionPoolImpl.o UFConnectionPoolImpl.C
+
+$(LIB_DIR)/UFIO.o: UFIO.C $(INCLUDE_DIR)/UFIO.H $(LIB_DIR)/UF.o
$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFIO.o UFIO.C
-UFStatSystem.o: UFStatSystem.C $(INCLUDE_DIR)/UFStatSystem.H
+$(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_DIR)/UFAres.H $(INCLUDE_DIR)/UFDNS.H $(INCLUDE_DIR)/UFHostEnt.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o $(ARES_SRC)
+ $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFAres.o UFAres.C
+
+$(LIB_DIR)/UFStatSystem.o: UFStatSystem.C $(INCLUDE_DIR)/UFStatSystem.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o
$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFStatSystem.o UFStatSystem.C
-UFServer.o: UFServer.C $(INCLUDE_DIR)/UFServer.H
+$(LIB_DIR)/UFServer.o: UFServer.C $(INCLUDE_DIR)/UFServer.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFConnectionPoolImpl.o
$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFServer.o UFServer.C
-UFConnectionPoolImpl.o: UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H
- $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConnectionPoolImpl.o UFConnectionPoolImpl.C
-
-libUF.a: UF.o UFIO.o UFStatSystem.o UFServer.o UFConnectionPoolImpl.o
- $(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a \
- $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFConnectionPoolImpl.o
+$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFServer.o \
+ $(LIB_DIR)/UFConnectionPoolImpl.o $(LIB_DIR)/UFAres.o $(ARES_LIB)
+ $(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $(ARES_LIB) \
+ $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFServer.o \
+ $(LIB_DIR)/UFConnectionPoolImpl.o $(LIB_DIR)/UFAres.o $(LIB_DIR)/UFPC.o
$(RANLIB) $(LIB_DIR)/libUF.a
clean:
+ echo "cleaning ..."
$(RM) $(LIB_DIR)/*.o $(LIB_DIR)/*.a
Copied: trafficserver/traffic/branches/UserFiber/core/src/UF.C (from r944754, trafficserver/traffic/branches/UserFiber/src/UF.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UF.C?p2=trafficserver/traffic/branches/UserFiber/core/src/UF.C&p1=trafficserver/traffic/branches/UserFiber/src/UF.C&r1=944754&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UF.C Sun May 30 02:07:05 2010
@@ -109,11 +109,11 @@ static pthread_key_t getThreadKey()
pthread_key_t UFScheduler::_specific_key = getThreadKey();
UFScheduler::UFScheduler()
{
+ _activeRunningListSize = 0;
_earliestWakeUpFromSleep = 0;
_exitJustMe = false;
_specific = 0;
_currentFiber = 0;
- _conn_pool = new UFConnectionPool;
if(_inThreadedMode)
{
@@ -159,7 +159,6 @@ UFScheduler::UFScheduler()
UFScheduler::~UFScheduler()
{
//pthread_key_delete(_specific_key);
- delete _conn_pool;
}
@@ -181,6 +180,7 @@ bool UFScheduler::addFibersToScheduler(c
if(ufList.empty())
return true;
+ UF* uf = 0;
list<UF*>::const_iterator beg = ufList.begin();
list<UF*>::const_iterator ending = ufList.end();
//adding to the same scheduler and as a result thread as the current job
@@ -188,17 +188,23 @@ bool UFScheduler::addFibersToScheduler(c
{
for(; beg != ending; ++beg)
{
- UF* uf = *beg;
+ uf = *beg;
+ if(uf->_status == WAITING_TO_RUN) //UF is already in the queue
+ continue;
uf->_status = WAITING_TO_RUN;
if(uf->_parentScheduler) //probably putting back an existing uf into the active list
{
- if(uf->_parentScheduler != this) //cant schedule for some other thread
+ if(uf->_parentScheduler == this) //check that we're scheduling for the same thread
+ {
+ _activeRunningList.push_back(uf); ++_activeRunningListSize;
+ continue;
+ }
+ else
{
- cerr<<"uf is not part of this scheduler"<<endl;
+ cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->_parentScheduler<<endl;
+ abort(); //TODO: remove the abort
return false;
}
- _activeRunningList.push_back(uf);
- continue;
}
//create a new context
@@ -213,7 +219,7 @@ bool UFScheduler::addFibersToScheduler(c
cerr<<"error while trying to run makecontext"<<endl;
return false;
}
- _activeRunningList.push_back(uf);
+ _activeRunningList.push_back(uf); ++_activeRunningListSize;
}
}
else //adding to some other threads' scheduler
@@ -232,7 +238,10 @@ bool UFScheduler::addFibersToScheduler(c
UFScheduler* ufs = index->second;
pthread_mutex_lock(&(ufs->_mutexToNominateToActiveList));
for(; beg != ending; ++beg)
- ufs->_nominateToAddToActiveRunningList.push_back(*beg);
+ {
+ uf = *beg;
+ ufs->_nominateToAddToActiveRunningList.push_back(uf);
+ }
pthread_cond_signal(&(ufs->_condToNominateToActiveList));
pthread_mutex_unlock(&(ufs->_mutexToNominateToActiveList));
ufs->notifyUF();
@@ -263,12 +272,15 @@ void UFScheduler::runScheduler()
gettimeofday(&start, 0);
unsigned long long int timeNow = 0;
- while(!_exitJustMe && !_exit)
+ UFList::iterator ufBeg;
+ UFList::iterator nBeg;
+ MapTimeUF::iterator slBeg;
+ bool waiting = false;
+ while(!shouldExit())
{
- UFList::iterator beg = _activeRunningList.begin();
- for(; beg != _activeRunningList.end(); )
+ for(ufBeg = _activeRunningList.begin(); ufBeg != _activeRunningList.end(); )
{
- UF* uf = *beg;
+ UF* uf = *ufBeg;
_currentFiber = uf;
uf->_status = RUNNING;
swapcontext(&_mainContext, &(uf->_UFContext));
@@ -277,18 +289,18 @@ void UFScheduler::runScheduler()
if(uf->_status == RUNNING) { }
else if(uf->_status == BLOCKED)
{
- beg = _activeRunningList.erase(beg);
+ ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
continue;
}
else if(uf->_status == COMPLETED)
{
delete uf;
- beg = _activeRunningList.erase(beg);
+ ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
continue;
}
uf->_status = WAITING_TO_RUN;
- ++beg;
+ ++ufBeg;
}
@@ -309,15 +321,18 @@ void UFScheduler::runScheduler()
//TODO: do atomic comparison to see if there is anything in
//_nominateToAddToActiveRunningList before getting the lock
pthread_mutex_lock(&_mutexToNominateToActiveList);
- UFList::iterator beg = _nominateToAddToActiveRunningList.begin();
- for(; beg != _nominateToAddToActiveRunningList.end(); )
+ for(nBeg = _nominateToAddToActiveRunningList.begin();
+ nBeg != _nominateToAddToActiveRunningList.end(); )
{
- UF* uf = *beg;
+ UF* uf = *nBeg;
if(uf->_parentScheduler)
- _activeRunningList.push_back(uf);
+ {
+ uf->_status = WAITING_TO_RUN;
+ _activeRunningList.push_front(uf); ++_activeRunningListSize;
+ }
else //adding a new fiber
addFiberToScheduler(uf, 0);
- beg = _nominateToAddToActiveRunningList.erase(beg);
+ nBeg = _nominateToAddToActiveRunningList.erase(nBeg);
}
pthread_mutex_unlock(&_mutexToNominateToActiveList);
@@ -326,38 +341,52 @@ void UFScheduler::runScheduler()
//pick up the fibers that may have completed sleeping
//look into the sleep list;
+ //printf("%u %u tnc = %llu %llu\n", (unsigned int)pthread_self(), _sleepList.size(), _earliestWakeUpFromSleep, _earliestWakeUpFromSleep-timeNow);
if(!_sleepList.empty())
{
gettimeofday(&now, 0);
ranGetTimeOfDay = true;
- timeNow = (now.tv_sec*1000000)+now.tv_usec;
+ timeNow = timeInUS(now);
if(timeNow >= _earliestWakeUpFromSleep) //dont go into this queue unless the time seen the last time has passed
{
- for(MapTimeUF::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
+ for(slBeg = _sleepList.begin(); slBeg != _sleepList.end(); )
{
- //TODO: has to be cleaned up
//1. see if anyone has crossed the sleep timer - add them to the active list
- if(beg->first <= timeNow) //sleep time is over
+ if(slBeg->first <= timeNow) //sleep time is over
{
- _activeRunningList.push_back(beg->second);
- _sleepList.erase(beg);
- beg = _sleepList.begin();
+ UFWaitInfo *ufwi = slBeg->second;
+ ufwi->_ctrl.getSpinLock();
+ ufwi->_sleeping = false;
+ if(ufwi->_uf)
+ {
+ ufwi->_uf->_status = WAITING_TO_RUN;
+ _activeRunningList.push_front(ufwi->_uf); ++_activeRunningListSize;
+ ufwi->_uf = NULL;
+ }
+ waiting = ufwi->_waiting;
+ ufwi->_ctrl.releaseSpinLock();
+ if(!waiting) //since the uf is not being waited upon release it (the sleeping part has already been done)
+ releaseWaitInfo(*ufwi);
+
+ _sleepList.erase(slBeg);
+ slBeg = _sleepList.begin();
+
continue;
}
else
{
if(_amtToSleep) //since the nominate system might have turned off the sleep - we dont activate it again
- _amtToSleep = beg->first-timeNow;
- _earliestWakeUpFromSleep = beg->first;
+ _amtToSleep = slBeg->first-timeNow;
+ _earliestWakeUpFromSleep = slBeg->first;
break;
}
- ++beg;
+ ++slBeg;
}
}
}
//see if there is anything to do or is it just sleeping time now
- if(!_notifyFunc && _activeRunningList.empty() && !_exit)
+ if(!_notifyFunc && !_activeRunningListSize && !shouldExit())
{
if(_inThreadedMode) //go to conditional wait (in threaded mode)
{
@@ -436,6 +465,305 @@ int UFFactory::registerFunc(UF* uf)
return _size++;
}
+const unsigned int CONSECUTIVE_LOCK_FAILURES_ALLOWED = 3;
+bool UFMutex::lock(UF* uf)
+{
+ if(!uf || !uf->_parentScheduler)
+ return false;
+
+ getSpinLock();
+ if(_listOfClientsWaitingOnLock.empty()) //probably the most common case (no UF has the lock)
+ {
+#ifdef LOCK_DEBUG
+ printf("%lu l1\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+ _listOfClientsWaitingOnLock.push_back(uf);
+ _lockCurrentlyOwned = true;
+ releaseSpinLock();
+ return true;
+ }
+
+ //see if any UF is holding the lock right now - if not get the lock
+ //this is the case where between the time that an UF is woken up
+ //(after another UF releases the lock)
+ //and it actually runs this requesting UF might be able to procure the lock
+ //if there is a mustRunUF - that UF has to run first - and this UF has to go to the end of the line
+ if(!_lockCurrentlyOwned && !_mustRunUF)
+ {
+#ifdef LOCK_DEBUG
+ printf("%lu l2\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+ _listOfClientsWaitingOnLock.push_front(uf);
+ _lockCurrentlyOwned = true;
+ releaseSpinLock();
+ return true;
+ }
+
+ //for the rest of the UFs that didnt meet the above criteria
+ //and didnt get the lock they have to wait
+ _listOfClientsWaitingOnLock.push_back(uf);
+ releaseSpinLock();
+
+ unsigned short int counter = 0;
+ while(1) //try to get the lock
+ {
+#ifdef LOCK_DEBUG
+ printf("%lu wt\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+ //simply yield - since the uf will be woken up once it gets the lock
+ uf->waitOnLock();
+
+ //since this uf got woken up - check if it can get the lock now
+ getSpinLock();
+
+ //check if any other UF has gotten the lock between the time that this UF
+ //got the notification and actually acted on it
+ if(!_lockCurrentlyOwned && (_listOfClientsWaitingOnLock.front() == uf))
+ {
+#ifdef LOCK_DEBUG
+ printf("%lu l3\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+ _lockCurrentlyOwned = true;
+ _mustRunUF = 0;
+ releaseSpinLock();
+ return true;
+ }
+
+ if(++counter >= CONSECUTIVE_LOCK_FAILURES_ALLOWED) //dont let a UF fail to get the lock more than CONSECUTIVE_LOCK_FAILURES_ALLOWED times
+ _mustRunUF = uf;
+ releaseSpinLock();
+ }
+
+ return true;
+}
+
+bool UFMutex::unlock(UF* uf)
+{
+ if(!uf)
+ return false;
+
+ UFList::iterator beg;
+ getSpinLock();
+
+ beg = _listOfClientsWaitingOnLock.begin();
+ if(uf == *beg) //check if this uf is the current owner of this lock
+ {
+ _lockCurrentlyOwned = false;
+ beg = _listOfClientsWaitingOnLock.erase(beg);
+#ifdef LOCK_DEBUG
+ printf("%lu u %d\n", (unsigned long int) ((uintptr_t)(void*)uf), _listOfClientsWaitingOnLock.size());
+#endif
+
+ bool releasedLock = false;
+ //notify the next UF in line
+ while(!_listOfClientsWaitingOnLock.empty())
+ {
+ UF* tmpUf = *beg;
+ if(!tmpUf || !tmpUf->_parentScheduler) //invalid tmpuf - cant wake it up
+ {
+#ifdef LOCK_DEBUG
+ printf("%lu nf1\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+ beg = _listOfClientsWaitingOnLock.erase(beg);
+ if(beg == _listOfClientsWaitingOnLock.end())
+ break;
+ continue;
+ }
+ /*
+ if(tmpUf->getStatus() == WAITING_TO_RUN) //this uf has already been put into the waiting to run list
+ break;
+ */
+
+
+#ifdef LOCK_DEBUG
+ printf("%lu wk %lu\n",
+ (unsigned long int) ((uintptr_t)(void*)uf),
+ (unsigned long int) ((uintptr_t)(void*)tmpUf));
+#endif
+
+ releaseSpinLock();
+ releasedLock = true;
+ uf->_parentScheduler->addFiberToScheduler(tmpUf, tmpUf->_parentScheduler->_tid);
+ break;
+ }
+
+ if(!releasedLock)
+ releaseSpinLock();
+
+ return true;
+ }
+ else
+ {
+ cerr<<uf<<" tried to unlock but was not in top of list"<<endl;
+ abort();
+ }
+
+ releaseSpinLock();
+ return false;
+}
+
+bool UFMutex::tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS)
+{
+ while(1)
+ {
+ getSpinLock();
+ if(_listOfClientsWaitingOnLock.empty())
+ {
+ _listOfClientsWaitingOnLock.push_back(uf);
+ _lockCurrentlyOwned = true;
+ releaseSpinLock();
+ return true;
+ }
+
+ releaseSpinLock();
+
+ if(!autoRetryIntervalInUS)
+ break;
+
+ usleep(autoRetryIntervalInUS);
+ }
+
+ return false;
+}
+
+
+bool UFMutex::condWait(UF* uf)
+{
+ if(!uf)
+ return false;
+
+ //the object is already in the hash
+ if(_listOfClientsWaitingOnCond.find(uf) == _listOfClientsWaitingOnCond.end())
+ {
+ UFWaitInfo *ufwi = uf->_parentScheduler->getWaitInfo();
+ ufwi->_uf = uf;
+ ufwi->_waiting = true;
+
+ _listOfClientsWaitingOnCond[uf] = ufwi;
+ }
+
+ unlock(uf);
+ uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal or broadcast has occurred
+ lock(uf);
+
+ return true;
+}
+
+void UFMutex::broadcast()
+{
+ if(_listOfClientsWaitingOnCond.empty())
+ return;
+
+ UFScheduler* ufs = UFScheduler::getUFScheduler();
+ if(!ufs)
+ {
+ cerr<<"couldnt get scheduler on thread "<<pthread_self()<<endl;
+ return;
+ }
+
+ //notify all the UFs waiting to wake up
+ bool sleeping = false;
+ for(UFWLHash::iterator beg = _listOfClientsWaitingOnCond.begin();
+ beg != _listOfClientsWaitingOnCond.end(); ++beg)
+ {
+ // Get WaitInfo object
+ UFWaitInfo *ufwi = beg->second;
+
+ ufwi->_ctrl.getSpinLock();
+ ufwi->_waiting = false; // Set _waiting to false, indicating that the UFWI has been removed from the cond queue
+
+ // If uf is not NULL, schedule it and make sure no one else can schedule it again
+ if(ufwi->_uf)
+ {
+ ufs->addFiberToScheduler(ufwi->_uf, ufwi->_uf->_parentScheduler->_tid);
+ ufwi->_uf = NULL;
+ }
+
+ sleeping = ufwi->_sleeping;
+ ufwi->_ctrl.releaseSpinLock();
+ if(!sleeping) //sleep list has already run
+ ufs->releaseWaitInfo(*ufwi);
+ }
+ _listOfClientsWaitingOnCond.clear();
+}
+
+void UFMutex::signal()
+{
+ if(_listOfClientsWaitingOnCond.empty())
+ return;
+
+ UFScheduler* ufs = UFScheduler::getUFScheduler();
+ if(!ufs)
+ {
+ cerr<<"couldnt get scheduler"<<endl;
+ return;
+ }
+ UF *uf_to_signal = NULL;
+ bool sleeping = false;
+ for(UFWLHash::iterator beg = _listOfClientsWaitingOnCond.begin(); beg != _listOfClientsWaitingOnCond.end();)
+ {
+ // Take first client off list
+ UFWaitInfo *ufwi = beg->second;
+
+ ufwi->_ctrl.getSpinLock();
+ ufwi->_waiting = false; // Set _waiting to false, indicating that the UFWI has been removed from the cond queue
+
+ if(ufwi->_uf)
+ {
+ uf_to_signal = ufwi->_uf; // Store UF to signal
+ ufwi->_uf = NULL; // Clear UF. This ensures that no one else can schedule the UF.
+ }
+
+ sleeping = ufwi->_sleeping;
+ ufwi->_ctrl.releaseSpinLock();
+ if(!sleeping) //sleep list has already run
+ ufs->releaseWaitInfo(*ufwi);
+
+ // If a UF was found to signal, break out
+ _listOfClientsWaitingOnCond.erase(beg);
+ if(uf_to_signal)
+ break;
+ beg = _listOfClientsWaitingOnCond.begin();
+ }
+
+ if(uf_to_signal)
+ ufs->addFiberToScheduler(uf_to_signal, uf_to_signal->_parentScheduler->_tid);
+}
+
+int UFMutex::condTimedWait(UF* uf, unsigned long long int sleepAmtInUs)
+{
+ bool result = false;
+ if(!uf)
+ return result;
+
+ // Wrap uf in UFWait structure before pushing to wait and sleep queues
+
+ UFWaitInfo *ufwi = UFScheduler::getUFScheduler()->getWaitInfo();
+ ufwi->_uf = uf;
+ ufwi->_waiting = true;
+ ufwi->_sleeping = true;
+
+ // Add to waiting queue
+ _listOfClientsWaitingOnCond[uf] = ufwi;
+ unlock(uf);
+
+ // Add to sleep queue
+ struct timeval now;
+ gettimeofday(&now, 0);
+ unsigned long long int timeNow = timeInUS(now);
+ ufwi->_sleeping = true;
+ uf->_parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), ufwi));
+
+ uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal, broadcast or timeout has occurred
+
+ ufwi->_ctrl.getSpinLock();
+ result = ufwi->_sleeping;
+ ufwi->_ctrl.releaseSpinLock();
+
+ lock(uf);
+ return (result) ? true : false;//if result (ufwi->_sleeping) is not true, it must be that the sleep list activated this uf
+}
+
void* setupThread(void* args)
{
if(!args)
Added: trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFAres.C?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFAres.C (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFAres.C Sun May 30 02:07:05 2010
@@ -0,0 +1,217 @@
+#include "UFAres.H"
+#include <stdio.h>
+
+static void printHost(struct hostent* host , struct ares_addrttl *ttls = 0, int nttl = 0)
+{
+ return;
+ int i;
+ for(i = 0 ; host->h_addr_list[i] != NULL; i++)
+ {
+ printf("%d.%d.%d.%d\n",(unsigned char)host->h_addr_list[i][0],(unsigned char)host->h_addr_list[i][1],(unsigned char)host->h_addr_list[i][2],(unsigned char)host->h_addr_list[i][3]);
+ }
+ for(i = 0; host->h_aliases[i]!=NULL;i++)
+ printf("%s\n",host->h_aliases[i]);
+
+ for(i = 0 ; i < nttl ; i++)
+ {
+ char *name = inet_ntoa(ttls[i].ipaddr);
+ printf("IP is %s, TTL is %d\n",name,ttls[i].ttl);
+ }
+ printf("----------------------------------------------------------------------------------\n");
+}
+
+static double GetCurrentTime()
+{
+ timeval now;
+ gettimeofday(&now,NULL);
+ return((double)(now.tv_sec +(now.tv_usec/1000000.0)));
+}
+static void mycallback(void *arg, int status,
+ int timeouts, struct hostent *host)
+{
+ UFAresUFIO *ares = (UFAresUFIO*) arg;
+ if(!host)
+ {
+ ares->set_myhostent(NULL);
+ return;
+ }
+
+ printHost(host);
+ UFHostEnt *myhostent = ares->get_myhostent();
+ myhostent->set_hostent(host);
+ myhostent->set_nttl(0);
+ myhostent->set_aresttl(NULL);
+ myhostent->set_timestamp(0);
+ ares->set_myhostent(myhostent);
+}
+
+static void arescallback(void *arg, int status, int timeouts, unsigned char *abuf, int alen)
+{
+ UFAresUFIO *ares = (UFAresUFIO*) arg;
+ struct hostent *host = NULL;
+ struct ares_addrttl *ttls = (ares_addrttl*) malloc(sizeof(ares_addrttl) ); ;
+ int nttl = 1;
+ UFHostEnt *myhostent;
+ if(status == ARES_SUCCESS)
+ {
+ status = ares_parse_a_reply(abuf, alen, &host, ttls,&nttl);
+ if(!host)
+ {
+ ares->set_myhostent(NULL);
+ return;
+ }
+ printHost(host, ttls,nttl);
+ myhostent = ares->get_myhostent();
+ myhostent->set_hostent(host);
+ if(nttl>0)
+ myhostent->set_nttl(nttl);
+ else
+ myhostent->set_nttl(0);
+ myhostent->set_aresttl(ttls);
+ myhostent->set_timestamp(GetCurrentTime());
+ ares->set_complete(true);
+ }
+ else
+ cerr<<"failure"<<endl;
+
+
+}
+static void mycall(void *data, int fd, int read, int write)
+{
+ UFAresUFIO *ufio = (UFAresUFIO*) data;
+ if(read ==0 && write == 0)
+ {
+ ufio->setFd(-1);
+ }
+ else
+ ufio->setFd(fd);
+ if(read)
+ {
+ UFIOScheduler::getUFIOS()->setupForWrite(ufio);
+ }
+}
+
+UFAresUFIO* UFAres::GetNewAresUFIO()
+{
+ UFAresUFIO *aresio;
+ if( list_ares_ufio_.empty() == true)
+ {
+ aresio = new UFAresUFIO();
+ }
+ else
+ {
+ aresio = list_ares_ufio_.front();
+ aresio->setFd(-1);
+ aresio->set_myhostent((UFHostEnt*)0x00000BAD);
+ list_ares_ufio_.pop_front();
+ aresio->setUF(UFScheduler::getUF());
+ }
+ return aresio;
+}
+
+void UFAres::ReleaseAresUFIO(UFAresUFIO *aresio)
+{
+ if(aresio)
+ {
+ aresio->set_complete(false);
+ aresio->destroy();
+ list_ares_ufio_.push_back(aresio);
+ }
+ else
+ cerr<<"did not get valid UFAresUFIO"<<endl;
+
+}
+
+
+unsigned long int UFAres::GetHostByName(const char *name, uint32_t timeout)
+{
+ UFHostEnt* myhostent_ = GetCachedHostent(name);
+ if(myhostent_ == NULL)
+ {
+ myhostent_ = new UFHostEnt;
+ InsertToCache(name,myhostent_);
+ }
+ myhostent_->lock(UFScheduler::getUF());
+ unsigned long int ip = 0;
+ if(myhostent_ != NULL)
+ {
+ ip = myhostent_->GetUnExpiredEntry(GetCurrentTime());
+ }
+ if(ip != 0)
+ {
+ myhostent_->unlock(UFScheduler::getUF());
+ return ip;
+ }
+ else
+ {
+ myhostent_->ReleaseEntries();
+
+ UFAresUFIO *aresio = GetNewAresUFIO();
+ aresio->Init(mycall,aresio);
+ aresio->set_myhostent(myhostent_);
+
+ ares_search(aresio->get_channel(),name,1, 1,arescallback,aresio);
+ // process fd did not call mycallback yet
+
+ while ((aresio->get_complete() == false) && (aresio->getFd()!= -1))
+ {
+ UFIOScheduler::getUFIOS()->setupForRead(aresio);
+ // woken up by epoll again, process them
+ ares_process_fd(aresio->get_channel(), aresio->getFd(), aresio->getFd());
+ }
+
+
+ // out of loop , either we got hostent or timed out
+ myhostent_ = aresio->get_myhostent();
+ ReleaseAresUFIO(aresio);
+ myhostent_->unlock(UFScheduler::getUF());
+ return myhostent_->GetFirstIP();
+
+ }
+}
+
+struct hostent* UFAres::GetHostByNameDebug(const char *name, uint32_t timeout )
+{
+ UFAresUFIO *aresio = GetNewAresUFIO();
+ aresio->Init(mycall,aresio);
+ ares_gethostbyname(aresio->get_channel(),name,AF_INET,mycallback,aresio);
+ UFHostEnt *myhostent_ = aresio->get_myhostent();
+ if(myhostent_ == (UFHostEnt*)0x00000BAD && aresio->getFd() != -1)
+ {
+ UFIOScheduler::getUFIOS()->setupForRead(aresio);
+ }
+ else
+ {
+ return myhostent_->get_hostent();
+ }
+ // First time no need to block , blocked by setup for read or write
+ ares_process_fd(aresio->get_channel(), aresio->getFd(), aresio->getFd());
+
+ //process fd called mycallback and hostent is not null
+ myhostent_ = aresio->get_myhostent();
+ ReleaseAresUFIO(aresio);
+ if(myhostent_ && myhostent_ != (UFHostEnt*)0x00000BAD)
+ {
+ return myhostent_->get_hostent();
+ }
+ else
+ {
+ // process fd did not call mycallback yet
+ while (aresio->get_myhostent() == (UFHostEnt*)0x00000BAD)
+ {
+ UFIOScheduler::getUFIOS()->setupForRead(aresio);
+ // woken up by epoll again
+ ares_process_fd(aresio->get_channel(), aresio->getFd(), aresio->getFd());
+ }
+
+ }
+
+ // out of loop , either we got hostent or timed out
+ myhostent_ = aresio->get_myhostent();
+ ReleaseAresUFIO(aresio);
+ return myhostent_->get_hostent();
+
+}
+
+
+
Copied: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C (from r944754, trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C?p2=trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C&p1=trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C&r1=944754&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C Sun May 30 02:07:05 2010
@@ -9,11 +9,13 @@
#include <stdio.h>
#include <errno.h>
+#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
+#include <string.h>
const unsigned short int PERCENT_LOGGING_SAMPLING = 5;
@@ -182,10 +184,7 @@ UFConnectionGroupInfo* UFConnectionPoolI
{
GroupIPMap::iterator foundItr = _groupIpMap.find(name);
if(foundItr == _groupIpMap.end())
- {
- cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"didnt find group with name "<<name <<" to remove"<<endl;
return NULL;
- }
UFConnectionGroupInfo* removedObj = (*foundItr).second;
_groupIpMap.erase(foundItr);
@@ -230,7 +229,6 @@ UFIO* UFConnectionPoolImpl::getConnectio
UFConnectionGroupInfo* groupInfo = NULL;
if((foundItr == _groupIpMap.end()) || !((*foundItr).second))
{
- cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"null group or didnt find group with name "<<groupName<<endl;
groupInfo = addGroupImplicit(groupName);
if(!groupInfo)
return NULL;
@@ -315,7 +313,8 @@ UFIO* UFConnectionPoolImpl::getConnectio
}
else {
// if _maxSimultaneousConns is hit, wait for a connection to become available
- if(ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns) {
+ if(ipInfo->_maxSimultaneousConns &&
+ (ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns)) {
// wait for a connection to be released
ipInfo->_someConnectionAvailable.lock(this_user_fiber);
ipInfo->_someConnectionAvailable.condWait(this_user_fiber);
@@ -401,7 +400,6 @@ void UFConnectionPoolImpl::clearBadConne
void UFConnectionPoolImpl::releaseConnection(UFIO* ufIO, bool connOk)
{
- cerr << "UFConnectionPoolImpl::releaseConnection" << endl;
if(!ufIO)
return;
@@ -538,10 +536,7 @@ double UFConnectionPoolImpl::getGroupAva
GroupIPMap::const_iterator foundItr = _groupIpMap.find(name);
if((foundItr == _groupIpMap.end()) || !((*foundItr).second))
- {
- cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"null group or didnt find group with name "<<((*foundItr).second ? (*foundItr).second->_name : "")<<endl;
return result;
- }
return (*foundItr).second->getAvailability();
}
@@ -555,16 +550,21 @@ UFConnectionPoolImpl::~UFConnectionPoolI
void UFConnectionPoolCleaner::run()
{
- UFScheduler *this_thread_scheduler = UFScheduler::getUFScheduler();
- UF* this_uf = this_thread_scheduler->getRunningFiberOnThisThread();
+ UF* this_uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+ UFIOScheduler* ufios = UFIOScheduler::getUFIOS();
+ UFConnectionPool* ufcp = ufios->getConnPool();
+
+ if(!ufcp)
+ {
+ cerr<<"couldnt get conn pool on thread "<<pthread_self()<<endl;
+ exit(1);
+ }
+
while(1)
{
this_uf->usleep(300*1000*1000);
- if(!this_thread_scheduler->_conn_pool)
- break;
- this_thread_scheduler->_conn_pool->clearBadConnections();
+ ufcp->clearBadConnections();
}
- this_thread_scheduler->setExit();
}
UFConnectionPoolImpl::UFConnectionPoolImpl()
@@ -582,7 +582,7 @@ void UFConnectionPoolImpl::init()
}
}
-int UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST = 5;
+int UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST = 0;
int UFConnectionPool::TIMEOUT_PER_REQUEST = 10;
UFConnectionPool::UFConnectionPool()
Copied: trafficserver/traffic/branches/UserFiber/core/src/UFIO.C (from r944754, trafficserver/traffic/branches/UserFiber/src/UFIO.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFIO.C?p2=trafficserver/traffic/branches/UserFiber/core/src/UFIO.C&p1=trafficserver/traffic/branches/UserFiber/src/UFIO.C&r1=944754&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFIO.C Sun May 30 02:07:05 2010
@@ -37,13 +37,15 @@ void UFIO::reset()
_errno = 0;
_ufios = 0;
_lastEpollFlag = 0;
- _amtReadLastTimeEqualToAskedAmt = false;
_sleepInfo = 0;
+ _markedActive = false;
}
UFIO::~UFIO()
{
close();
+ if(_sleepInfo)
+ _sleepInfo->_ufio = 0;
}
UFIO::UFIO(UF* uf, int fd)
@@ -113,7 +115,7 @@ int UFIO::setupConnectionToAccept(const
return -1;
}
- char* interface_addr = ((i_a) && strlen(i_a)) ? const_cast<char*>(i_a) : const_cast<char*>(string("0.0.0.0").c_str());
+ const char* interface_addr = ((i_a) && strlen(i_a)) ? i_a : "0.0.0.0";
int n = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n)) < 0)
@@ -264,7 +266,7 @@ void UFIO::accept(UFIOAcceptThreadChoose
connectedArgs->ufio->_remotePort = cli_addr.sin_port;
uf->_startingArgs = connectedArgs;
- listOfUFsToAdd.push_front(uf);
+ listOfUFsToAdd.push_back(uf);
listOfUFsToAddSize++;
if(listOfUFsToAddSize == 100)
@@ -302,10 +304,14 @@ void UFIO::accept(UFIOAcceptThreadChoose
}
}
-UFIOScheduler* UFIO::getUFIOS()
+UFIOScheduler* UFIOScheduler::getUFIOS(pthread_t tid)
{
UFIOScheduler* tmpUfios = 0;
- //
+
+ if(!tid || tid == pthread_self())
+ return (UFIOScheduler*)pthread_getspecific(_keyToIdentifySchedulerOnThread);
+
+
//find the ufios for this thread - this map operation should only be done once
ThreadFiberIOSchedulerMap::iterator index = UFIOScheduler::_tfiosscheduler.find(pthread_self());
if(index != UFIOScheduler::_tfiosscheduler.end())
@@ -321,41 +327,27 @@ UFIOScheduler* UFIO::getUFIOS()
ssize_t UFIO::read(void *buf, size_t nbyte, TIME_IN_US timeout)
{
- UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+ UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- //we read everything there was to be read the last time, so this time wait to read
- if(!_amtReadLastTimeEqualToAskedAmt)
- {
- //wait for something to read first
- if(!tmpUfios->setupForRead(this, timeout))
- {
- _errno = errno;
- return -1;
- }
- if(_errno == ETIMEDOUT) //setupForRead will return w/ success however it will set the errno to ETIMEDOUT if a timeout occurred
- return -1;
- }
-
- _amtReadLastTimeEqualToAskedAmt = false;
ssize_t n = 0;;
while(1)
{
n = ::read(_fd, buf, nbyte);
if(n > 0)
- {
- _amtReadLastTimeEqualToAskedAmt = ((unsigned int) n != nbyte) ? false : true;
return n;
- }
else if(n < 0)
{
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
_errno = 0;
- if(!tmpUfios->setupForRead(this, timeout))
+ _markedActive = false;
+ while(!_markedActive)
{
- _errno = errno;
- return -1;
+ //wait for something to read first
+ if(!tmpUfios->setupForRead(this, timeout))
+ return -1;
}
+ continue;
}
else if(errno == EINTR)
continue;
@@ -373,7 +365,7 @@ ssize_t UFIO::read(void *buf, size_t nby
ssize_t UFIO::write(const void *buf, size_t nbyte, TIME_IN_US timeout)
{
- UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+ UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
ssize_t n = 0;;
unsigned int amtWritten = 0;
@@ -393,17 +385,21 @@ ssize_t UFIO::write(const void *buf, siz
_errno = errno;
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
+ _markedActive = false;
_errno = 0;
- if(!tmpUfios->setupForWrite(this, timeout))
+ while(!_markedActive)
{
- _errno = errno;
- return -1;
+ if(!tmpUfios->setupForWrite(this, timeout))
+ return -1;
}
}
else if(errno == EINTR)
continue;
else
+ {
+ _errno = errno;
break;
+ }
}
else if(n == 0)
break;
@@ -420,7 +416,7 @@ int UFIO::connect(const struct sockaddr
//find the scheduler for this request
- UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+ UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
while(::connect(_fd, addr, addrlen) < 0)
{
@@ -444,7 +440,7 @@ int UFIO::connect(const struct sockaddr
int UFIO::sendto(const char *buf, int len, const struct sockaddr *to, int tolen, TIME_IN_US timeout)
{
- UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+ UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
ssize_t n = 0;;
unsigned int amtWritten = 0;
@@ -464,17 +460,21 @@ int UFIO::sendto(const char *buf, int le
_errno = errno;
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
+ _markedActive = false;
_errno = 0;
- if(!tmpUfios->setupForWrite(this, timeout))
+ while(!_markedActive)
{
- _errno = errno;
- return -1;
+ if(!tmpUfios->setupForWrite(this, timeout))
+ return -1;
}
}
else if(errno == EINTR)
continue;
else
+ {
+ _errno = errno;
break;
+ }
}
else if(n == 0)
break;
@@ -486,7 +486,7 @@ int UFIO::sendmsg(const struct msghdr *m
int flags,
TIME_IN_US timeout)
{
- UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+ UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
ssize_t n = 0;;
while(1)
@@ -499,17 +499,21 @@ int UFIO::sendmsg(const struct msghdr *m
_errno = errno;
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
+ _markedActive = false;
_errno = 0;
- if(!tmpUfios->setupForWrite(this, timeout))
+ while(!_markedActive)
{
- _errno = errno;
- return -1;
+ if(!tmpUfios->setupForWrite(this, timeout))
+ return -1;
}
}
else if(errno == EINTR)
continue;
else
+ {
+ _errno = errno;
break;
+ }
}
else if(n == 0)
break;
@@ -523,41 +527,26 @@ int UFIO::recvfrom(char *buf,
int *fromlen,
TIME_IN_US timeout)
{
- UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
-
- //we read everything there was to be read the last time, so this time wait to read
- if(!_amtReadLastTimeEqualToAskedAmt)
- {
- //wait for something to read first
- if(!tmpUfios->setupForRead(this, timeout))
- {
- _errno = errno;
- return -1;
- }
- if(_errno == ETIMEDOUT) //setupForRead will return w/ success however it will set the errno to ETIMEDOUT if a timeout occurred
- return -1;
- }
+ UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- _amtReadLastTimeEqualToAskedAmt = false;
ssize_t n = 0;;
while(1)
{
n = ::recvfrom(_fd, buf, len, 0, from, (socklen_t *)fromlen);
if(n > 0)
- {
- _amtReadLastTimeEqualToAskedAmt = (n != len) ? false : true;
return n;
- }
else if(n < 0)
{
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
_errno = 0;
- if(!tmpUfios->setupForRead(this, timeout))
+ _markedActive = false;
+ while(!_markedActive)
{
- _errno = errno;
- return -1;
+ if(!tmpUfios->setupForRead(this, timeout))
+ return -1;
}
+ continue;
}
else if(errno == EINTR)
continue;
@@ -577,7 +566,7 @@ int UFIO::recvmsg(struct msghdr *msg,
int flags,
TIME_IN_US timeout)
{
- UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+ UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
ssize_t n = 0;
while(1)
@@ -590,11 +579,13 @@ int UFIO::recvmsg(struct msghdr *msg,
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
_errno = 0;
- if(!tmpUfios->setupForRead(this, timeout))
+ _markedActive = false;
+ while(!_markedActive)
{
- _errno = errno;
- return -1;
+ if(!tmpUfios->setupForRead(this, timeout))
+ return -1;
}
+ continue;
}
else if(errno == EINTR)
continue;
@@ -611,6 +602,16 @@ int UFIO::recvmsg(struct msghdr *msg,
}
+static pthread_key_t getThreadKey()
+{
+ if(pthread_key_create(&UFIOScheduler::_keyToIdentifySchedulerOnThread, 0) != 0)
+ {
+ cerr<<"couldnt create ufios thread specific key "<<strerror(errno)<<endl;
+ exit(1);
+ }
+ return UFIOScheduler::_keyToIdentifySchedulerOnThread;
+}
+pthread_key_t UFIOScheduler::_keyToIdentifySchedulerOnThread = getThreadKey();
ThreadFiberIOSchedulerMap UFIOScheduler::_tfiosscheduler;
EpollUFIOScheduler::EpollUFIOScheduler(UF* uf, unsigned int maxFds)
@@ -623,6 +624,15 @@ EpollUFIOScheduler::EpollUFIOScheduler(U
_earliestWakeUpFromSleep = 0;
}
+UFIOScheduler::UFIOScheduler()
+{
+ _connPool = new UFConnectionPool;
+}
+UFIOScheduler::~UFIOScheduler()
+{
+ delete _connPool;
+}
+
EpollUFIOScheduler::~EpollUFIOScheduler()
{
if(_epollFd != -1)
@@ -662,15 +672,19 @@ bool EpollUFIOScheduler::isSetup()
_epollFd = -1;
}
+ pthread_setspecific(_keyToIdentifySchedulerOnThread, this);
return (_alreadySetup = true);
}
-bool EpollUFIOScheduler::addToScheduler(UFIO* ufio, void* inputInfo, TIME_IN_US to)
+bool EpollUFIOScheduler::addToScheduler(UFIO* ufio, void* inputInfo, TIME_IN_US to, bool wait, bool runEpollCtl)
{
if(!ufio || !inputInfo || !isSetup())
+ {
+ ufio->_errno = EINVAL;
return false;
+ }
- if(ufio->_lastEpollFlag != *((int*)inputInfo)) //dont do anything if the flags are same as last time
+ if(runEpollCtl || ufio->_lastEpollFlag != *((int*)inputInfo)) //dont do anything if the flags are same as last time
{
struct epoll_event ev;
ev.data.fd = ufio->getFd();
@@ -689,6 +703,7 @@ bool EpollUFIOScheduler::addToScheduler(
{
cerr<<"couldnt add/modify fd to epoll queue "<<strerror(errno)<<" trying to add "<<ufio->getFd()<<" to "<<_epollFd<<endl;
exit(1);
+ ufio->_errno = EINVAL;
return false;
}
ufio->_lastEpollFlag = ev.events;
@@ -704,6 +719,8 @@ bool EpollUFIOScheduler::addToScheduler(
{
cerr<<"couldnt create sleep info"<<endl;
exit(1);
+ ufio->_errno = EINVAL;
+ return false;
}
ufsi->_ufio = ufio;
ufio->_sleepInfo = ufsi;
@@ -714,14 +731,24 @@ bool EpollUFIOScheduler::addToScheduler(
_sleepList.insert(std::make_pair(timeToWakeUp, ufsi));
}
+ ufio->_errno = 0;
+ ufio->setUF(_ufs->getRunningFiberOnThisThread());
+ ufio->_markedActive = false;
+ if(!wait)
+ return true;
ufio->getUF()->block(); //switch context till someone wakes me up
+
+ if(!to) //nothing to do w/ no timeout
+ return true;
+
if(ufio->_sleepInfo)
{
- ufio->_sleepInfo->_ufio = 0; //set the sleep indicator to not have a dependency w/ this ufio
- ufio->_sleepInfo = 0; //remove the sleep indicator
+ ufio->_sleepInfo->_ufio = 0;
+ ufio->_sleepInfo = 0;
+ return true;
}
-
- return true;
+ //ufio->_errno = ETIMEDOUT
+ return false;
}
bool EpollUFIOScheduler::setupForConnect(UFIO* ufio, TIME_IN_US to)
@@ -759,7 +786,6 @@ bool EpollUFIOScheduler::closeConnection
_intUFIOMap.erase(index);
return true;
-
/*
struct epoll_event ev;
ev.data.fd = ufio->getFd();
@@ -768,6 +794,25 @@ bool EpollUFIOScheduler::closeConnection
*/
}
+bool EpollUFIOScheduler::rpoll(list<UFIO*>& ufioList, TIME_IN_US to)
+{
+ int flags = EPOLLIN|EPOLLET|EPOLLPRI|EPOLLERR|EPOLLHUP;
+ for(list<UFIO*>::iterator beg = ufioList.begin();
+ beg != ufioList.end();
+ ++beg)
+ addToScheduler(*beg, (void*)&flags, 0, false, true);
+
+ _ufs->getRunningFiberOnThisThread()->block();
+ /*
+ if(!to)
+ _ufs->getRunningFiberOnThisThread()->block();
+ else
+ _ufs->getRunningFiberOnThisThread()->usleep(to);
+ */
+
+ return true;
+}
+
#ifndef PIPE_NOT_EFD
#include <sys/eventfd.h>
@@ -853,8 +898,8 @@ static void* notifyEpollFunc(void* args)
void EpollUFIOScheduler::waitForEvents(TIME_IN_US timeToWait)
{
- UFScheduler* myScheduler = UFScheduler::getUFScheduler();
- if(!myScheduler)
+ _ufs = UFScheduler::getUFScheduler();
+ if(!_ufs)
{
cerr<<"have to be able to find my scheduler"<<endl;
return;
@@ -872,18 +917,18 @@ void EpollUFIOScheduler::waitForEvents(T
}
makeSocketNonBlocking(pfd[0]);
//makeSocketNonBlocking(pfd[1]); - dont make the write socket non-blocking
- myScheduler->_notifyArgs = (void*)(&pfd[1]);
+ _ufs->_notifyArgs = (void*)(&pfd[1]);
ens->_efd = pfd[0];
#else
int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC); //TODO: check the error code of the eventfd creation
- myScheduler->_notifyArgs = (void*)&efd;
+ _ufs->_notifyArgs = (void*)&efd;
ens->_efd = efd;
#endif
- myScheduler->_notifyFunc = notifyEpollFunc;
+ _ufs->_notifyFunc = notifyEpollFunc;
//add the UF to handle the efds calls
UF* eventFdFiber = new ReadNotificationUF();
eventFdFiber->_startingArgs = ens;
- myScheduler->addFiberToScheduler(eventFdFiber, 0);
+ _ufs->addFiberToScheduler(eventFdFiber, 0);
@@ -906,7 +951,6 @@ void EpollUFIOScheduler::waitForEvents(T
UFIO* ufio = 0;
UF* uf = 0;
unsigned long long int amtToSleep = timeToWait;
- unsigned long long int ufsAmtToSleep = 0;
int i = 0;
_interruptedByEventFd = false;
UFScheduler* ufs = _uf->getParentScheduler();
@@ -916,6 +960,7 @@ void EpollUFIOScheduler::waitForEvents(T
cerr<<"epoll scheduler has to be connected to some scheduler"<<endl;
return;
}
+ unsigned long long int amtToSleepFromUFS = 0;
while(1)
{
if(_interruptedByEventFd) //this is so that the last interruption gets handled right away
@@ -924,10 +969,15 @@ void EpollUFIOScheduler::waitForEvents(T
_uf->yield();
}
- if(amtToSleep > (ufsAmtToSleep = ufs->getAmtToSleep()))
- amtToSleep = (int)(ufsAmtToSleep/1000);
+ if(ufs->getActiveRunningListSize() < 2) //means that epoll is the only fiber thats currently active
+ {
+ if(amtToSleep > (amtToSleepFromUFS = ufs->getAmtToSleep()))
+ amtToSleep = (int)(amtToSleepFromUFS/1000);
+ else
+ amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
+ }
else
- amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
+ amtToSleep = 0; //dont wait on epoll - since there are other ufs waiting to run
nfds = ::epoll_wait(_epollFd, _epollEventStruct, _maxFds, amtToSleep);
if(nfds > 0)
{
@@ -943,6 +993,7 @@ void EpollUFIOScheduler::waitForEvents(T
cerr<<"invalid user fiber io found for fd, "<<_epollEventStruct[i].data.fd<<endl;
exit(1);
}
+ ufio->_markedActive = true;
//activate the fiber
ufs->addFiberToScheduler(uf, 0);
}
@@ -1014,8 +1065,8 @@ void EpollUFIOScheduler::waitForEvents(T
_uf->yield();
}
- myScheduler->_notifyArgs = 0;
- myScheduler->_notifyFunc = 0;
+ _ufs->_notifyArgs = 0;
+ _ufs->_notifyFunc = 0;
}
int IORunner::_myLoc = -1;