You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/30 04:07:06 UTC

svn commit: r949458 [1/2] - in /trafficserver/traffic/branches/UserFiber: ./ core/ core/ext/ core/include/ core/lib/ core/src/ include/ protocol/ protocol/http/ protocol/http/include/ protocol/http/lib/ protocol/http/src/ samples/ src/

Author: akundu
Date: Sun May 30 02:07:05 2010
New Revision: 949458

URL: http://svn.apache.org/viewvc?rev=949458&view=rev
Log:
Build changes
- setup the core and protocol directories
    - setup src + include under core
    - setup http under protocol

Code changes
- complete support for Producer and Consumer modules that make tunnel creations trivial (UFPC)
- Vijaya Mamidi's support for incorporating asynchronous dns resolving using the c-ares library
- Raghav J.'s support for condTimedWait
- lock system changes
- upgrade sleep system to avoid having to go into the sleep list unless previous sleep iterations indicate the time may have expired
- support for rpoll (will get replaced by a generic poll call later)

Test cases
    - testSignal.C - user lock + signal test case
    - testSleep.C - sleep test
    - testProducer.C - producer and consumer test

Sample Code
    - httpProxy.C - a simple proxy server
    - echoServer.C - a simple echo server


Added:
    trafficserver/traffic/branches/UserFiber/core/
    trafficserver/traffic/branches/UserFiber/core/ext/
    trafficserver/traffic/branches/UserFiber/core/include/
    trafficserver/traffic/branches/UserFiber/core/include/UF.H
      - copied, changed from r944753, trafficserver/traffic/branches/UserFiber/include/UF.H
    trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
    trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H   (props changed)
      - copied unchanged from r944753, trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
    trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
    trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H
    trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
      - copied, changed from r944753, trafficserver/traffic/branches/UserFiber/include/UFIO.H
    trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
    trafficserver/traffic/branches/UserFiber/core/include/UFServer.H
      - copied unchanged from r944753, trafficserver/traffic/branches/UserFiber/include/UFServer.H
    trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H
      - copied, changed from r944753, trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H
    trafficserver/traffic/branches/UserFiber/core/lib/
    trafficserver/traffic/branches/UserFiber/core/src/   (props changed)
      - copied from r944753, trafficserver/traffic/branches/UserFiber/src/
    trafficserver/traffic/branches/UserFiber/core/src/UF.C
      - copied, changed from r944754, trafficserver/traffic/branches/UserFiber/src/UF.C
    trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
      - copied, changed from r944754, trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
      - copied unchanged from r944754, trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.H
    trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
      - copied, changed from r944754, trafficserver/traffic/branches/UserFiber/src/UFIO.C
    trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
    trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
      - copied, changed from r944754, trafficserver/traffic/branches/UserFiber/src/UFServer.C
    trafficserver/traffic/branches/UserFiber/protocol/   (with props)
    trafficserver/traffic/branches/UserFiber/protocol/http/
    trafficserver/traffic/branches/UserFiber/protocol/http/include/
    trafficserver/traffic/branches/UserFiber/protocol/http/lib/
    trafficserver/traffic/branches/UserFiber/protocol/http/src/
    trafficserver/traffic/branches/UserFiber/protocol/http/ufHTTPServer.C
    trafficserver/traffic/branches/UserFiber/samples/UFCondTimedWaitTest.C
    trafficserver/traffic/branches/UserFiber/samples/testProducer.C
    trafficserver/traffic/branches/UserFiber/samples/testSignal.C
    trafficserver/traffic/branches/UserFiber/samples/testSleep.C
Removed:
    trafficserver/traffic/branches/UserFiber/include/
    trafficserver/traffic/branches/UserFiber/src/
    trafficserver/traffic/branches/UserFiber/ufHTTPServer.C
Modified:
    trafficserver/traffic/branches/UserFiber/Makefile
    trafficserver/traffic/branches/UserFiber/core/src/Makefile
    trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
    trafficserver/traffic/branches/UserFiber/samples/Makefile
    trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
    trafficserver/traffic/branches/UserFiber/samples/echoServer.C

Modified: trafficserver/traffic/branches/UserFiber/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/Makefile?rev=949458&r1=949457&r2=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/Makefile Sun May 30 02:07:05 2010
@@ -1,18 +1,14 @@
-CXXFLAGS=-g -O3 -Wall -Wno-deprecated -march=x86-64
+CXXFLAGS=-g -O3 -Wall -Werror -Wno-deprecated -march=x86-64
+#CXXFLAGS=-g -Wall -Werror -Wno-deprecated -march=x86-64 -fno-inline
 CPPFLAGS=-DPIPE_NOT_EFD -I./include
-LDFLAGS=-L./lib -lUF -lpthread
+LDFLAGS=-L./core/lib -lUF -lpthread
 
-.PHONY: all clean lib
+.PHONY: all clean core/lib
 
-all: ufHTTPServer
+all: core/lib
 
-lib:
-	$(MAKE) -C src all
-
-ufHTTPServer:	lib ufHTTPServer.C
-	$(CXX) -o $@ $(CPPFLAGS) $(CXXFLAGS) ufHTTPServer.C $(LDFLAGS)
+core/lib:
+	$(MAKE) -C core/src all
 
 clean: 
-	$(MAKE) -C src clean
-	$(RM) *.o ufHTTPServer
-
+	$(MAKE) -C core/src clean

Copied: trafficserver/traffic/branches/UserFiber/core/include/UF.H (from r944753, trafficserver/traffic/branches/UserFiber/include/UF.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UF.H?p2=trafficserver/traffic/branches/UserFiber/core/include/UF.H&p1=trafficserver/traffic/branches/UserFiber/include/UF.H&r1=944753&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UF.H Sun May 30 02:07:05 2010
@@ -12,6 +12,7 @@
 #include <list>
 #include <ucontext.h>
 #include <pthread.h>
+#include <errno.h>
 
 using namespace std;
 namespace std { using namespace __gnu_cxx; }
@@ -28,6 +29,7 @@ enum UFStatus
 //create the type of UF you'd like to pass into the accept handler
 typedef unsigned long long int UFId;
 struct UFScheduler;
+struct UFMutex;
 struct UF
 {
     friend class UFScheduler;
@@ -52,6 +54,7 @@ struct UF
     static void          gusleep(unsigned long long int sleepAmtInUs);
     ///simply block the fiber
     void                 block();
+    UFStatus             getStatus() const;
 
 
     UFStatus             _status;
@@ -67,6 +70,7 @@ protected:
 private:
     void waitOnLock();
 };
+inline UFStatus UF::getStatus() const { return _status; }
 
 struct UFFactory
 {
@@ -82,25 +86,20 @@ protected:
     size_t              _capacity;
     size_t              _size;
 };
-inline UFFactory* UFFactory::getInstance()
-{
-    return (_instance ? _instance : (_instance = new UFFactory()));
-}
-inline UF* UFFactory::selectUF(unsigned int location)
-{
-    return _objMapping[location];
-}
-
-
+inline UFFactory* UFFactory::getInstance() { return (_instance ? _instance : (_instance = new UFFactory())); }
+inline UF* UFFactory::selectUF(unsigned int location) { return _objMapping[location]; }
 
+struct UFWaitInfo;
+typedef std::list<UFWaitInfo*> UFWaitList;
+typedef map<UF*, UFWaitInfo*> UFWLHash;
 typedef std::list<UF*>          UFList;
 struct UFMutex
 {
     UFMutex() 
     {
         _lockCurrentlyOwned = false;
-        _pendingLockNotification = 0;
         _lockActive = 0;
+        _mustRunUF = 0;
     }
 
     bool lock(UF* uf);
@@ -116,26 +115,38 @@ struct UFMutex
     //THE CALLER MUST get the lock before calling this fxn
     //THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
     void signal();
+    //THE CALLER MUST get the lock before calling this fxn
+    //THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
+    int condTimedWait(UF *uf, unsigned long long int sleepAmtInUs);
 
+    void releaseSpinLock(bool spinCPU = false);
+    void getSpinLock(bool spinCPU = false);
 
 protected:
-    int             _lockActive;
-    UFList          _listOfClientsWaitingOnLock;
-    UFList          _listOfClientsWaitingOnCond;
-    bool            _lockCurrentlyOwned;
-    UF*             _pendingLockNotification;
-
-    void releaseLocalLock();
-    void getLocalLock();
+    int                 _lockActive;
+    UFList              _listOfClientsWaitingOnLock;
+    //UFWaitList          _listOfClientsWaitingOnCond;
+    UFWLHash            _listOfClientsWaitingOnCond;
+    bool                _lockCurrentlyOwned;
+    UF*                 _mustRunUF;
 };
+struct UFWaitInfo
+{
+    UFWaitInfo() { reset(); }
+    void reset();
+    UF*                 _uf;
+    bool                _sleeping;
+    bool                _waiting;
+    UFMutex             _ctrl;
+};
+inline void UFWaitInfo::reset() { _uf = 0; _sleeping = false; _waiting = false; }
 
 
-//per thread scheduler
-typedef std::multimap<unsigned long long int, UF*> MapTimeUF;
+typedef std::multimap<unsigned long long int, UFWaitInfo*> MapTimeUF;
 //typedef std::map<pthread_t,UFScheduler*> ThreadUFSchedulerMap;
+//per thread scheduler
 typedef hash_map<pthread_t, UFScheduler*, hash<uintptr_t> > ThreadUFSchedulerMap;
 
-struct UFConnectionPool;
 struct UFScheduler
 {
     friend class UF;
@@ -157,7 +168,7 @@ struct UFScheduler
 
 
     static ThreadUFSchedulerMap  _threadUFSchedulerMap;
-    static pthread_mutex_t       _mutexToCheckFiberScheduerMap;
+    static pthread_mutex_t       _mutexToCheckFiberSchedulerMap;
             
     //returns the fiber scheduler on this thread or other threads;
     static UFScheduler*          getUFScheduler(pthread_t tid = 0); 
@@ -173,7 +184,14 @@ struct UFScheduler
     void* getSpecific() const;
     unsigned long long int getAmtToSleep() const;
     static void setExit(bool exit = true);
+    bool shouldExit() const;
     void setExitJustMe(bool exit = true);
+    size_t getActiveRunningListSize() const;
+
+    //stats for thread
+    std::vector<long long> _stats;
+    UFMutex _stats_lock;
+
 
     ///the variable that says whether the scheduler should be handling the sleep or
     //if its handled w/in the UserFabrics
@@ -194,14 +212,16 @@ protected:
     //no lock for active running list - cause only the running
     //thread can add to it
     UFList                      _activeRunningList;
+    size_t                      _activeRunningListSize;
 
     //nominate to add to a thread's running list
     UFList                      _nominateToAddToActiveRunningList;
     pthread_mutex_t             _mutexToNominateToActiveList;
     pthread_cond_t              _condToNominateToActiveList;
-
+    
     //the sleep tree
     MapTimeUF                   _sleepList;
+    unsigned long long int      _earliestWakeUpFromSleep;
     //store the shortest sleep interval
     unsigned long long int      _amtToSleep;
 
@@ -211,16 +231,17 @@ protected:
     pthread_t                   _tid;
 
     void notifyUF();
+    
+    list<UFWaitInfo*>  _availableWaitInfo;
+    UFWaitInfo* getWaitInfo();
+    void releaseWaitInfo(UFWaitInfo& ufsi);
 
 public:
-    //stats for thread
-    std::vector<long long> _stats;
-    UFMutex _stats_lock;
-
-    // connection pool
-    UFConnectionPool *_conn_pool;
+    UFMutex testingCondTimedWait;
 };
 
+inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningListSize; }
+inline bool UFScheduler::shouldExit() const { return (_exitJustMe || _exit) ? true : false; }
 inline unsigned long long int UFScheduler::getAmtToSleep() const { return _amtToSleep; }
 inline UF* UFScheduler::getRunningFiberOnThisThread(){ return _currentFiber; }
 inline const ucontext_t& UFScheduler::getMainContext() const { return _mainContext; };
@@ -229,6 +250,24 @@ inline void* UFScheduler::getSpecific() 
 inline void UFScheduler::setExit(bool exit) { _exit = exit; }
 inline void UFScheduler::setExitJustMe(bool exit) { _exitJustMe = exit; }
 
+inline UFWaitInfo* UFScheduler::getWaitInfo()
+{
+    if(!_availableWaitInfo.empty())
+    {
+        UFWaitInfo* ufwi = _availableWaitInfo.front();
+        _availableWaitInfo.pop_front();
+        ufwi->reset();
+        return ufwi;
+    }
+
+    return new UFWaitInfo();
+}
+
+inline void UFScheduler::releaseWaitInfo(UFWaitInfo& ufwi)
+{
+    _availableWaitInfo.push_back(&ufwi);
+}
+
 inline UFScheduler* UF::getParentScheduler() const { return _parentScheduler; }
 
 inline void UF::waitOnLock() { block(); }
@@ -238,6 +277,11 @@ inline void UF::gusleep(unsigned long lo
     UFScheduler::getUFScheduler()->getRunningFiberOnThisThread()->usleep(sleepAmtInUs);
 }
 
+inline unsigned long long int timeInUS(timeval& t)
+{
+    return ((unsigned long long int)(((unsigned long long int) t.tv_sec)*1000000)+(unsigned long long int) t.tv_usec);
+}
+
 inline void UF::usleep(unsigned long long int sleepAmtInUs)
 {
     if(!sleepAmtInUs)
@@ -248,8 +292,17 @@ inline void UF::usleep(unsigned long lon
 
     struct timeval now;
     gettimeofday(&now, 0);
-    unsigned long long int timeNow = now.tv_sec*1000000+now.tv_usec;
-    _parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), this));
+    
+    unsigned long long int timeToWakeUp = timeInUS(now) + sleepAmtInUs;
+    if( _parentScheduler->_earliestWakeUpFromSleep > timeToWakeUp ||
+       !_parentScheduler->_earliestWakeUpFromSleep)
+        _parentScheduler->_earliestWakeUpFromSleep = timeToWakeUp;
+
+    UFWaitInfo *ufwi = _parentScheduler->getWaitInfo();
+    ufwi->_uf = this;
+    ufwi->_sleeping = true;
+
+    _parentScheduler->_sleepList.insert(std::make_pair(timeToWakeUp, ufwi));
     block();
 }
 
@@ -265,179 +318,14 @@ inline void UF::yield()
     swapcontext(&_UFContext, &(_parentScheduler->getMainContext()));
 }
 
-inline void UFMutex::releaseLocalLock()
-{
-    while(!__sync_bool_compare_and_swap(&_lockActive, 1, 0)) { sched_yield(); }
-    //while(!__sync_bool_compare_and_swap(&_lockActive, 1, 0)) { }
-}
-
-inline void UFMutex::getLocalLock()
-{
-    while(!__sync_bool_compare_and_swap(&_lockActive, 0, 1)) { sched_yield(); }
-    //while(!__sync_bool_compare_and_swap(&_lockActive, 0, 1)) { }
-}
-
-inline bool UFMutex::lock(UF* uf)
-{
-    if(!uf || !uf->_parentScheduler)
-        return false;
-
-    getLocalLock();
-    if(_listOfClientsWaitingOnLock.empty()) //probably the most common case (no UF has the lock)
-    {
-        _listOfClientsWaitingOnLock.push_back(uf);
-        _lockCurrentlyOwned = true;
-        releaseLocalLock();
-        return true;
-    }
-
-    //see if any UF is holding the lock right now - if not get the lock
-    //this is the case where between the time that an UF is woken up
-    //(after another UF releases the lock)
-    //and it actually runs this requesting UF might be able to procure the lock
-    if(!_lockCurrentlyOwned)
-    {
-        _listOfClientsWaitingOnLock.push_front(uf);
-        _lockCurrentlyOwned = true;
-        releaseLocalLock();
-        return true;
-    }
-
-    //for the rest of the UFs that didnt meet the above criteria 
-    //and didnt get the lock they have to wait
-    _listOfClientsWaitingOnLock.push_back(uf);
-    releaseLocalLock();
-
-    while(1) //try to get the lock
-    {
-        //simply yield - since the uf will be woken up once it gets the lock
-        uf->waitOnLock();
-
-        //since this uf got woken up - check if it can get the lock now
-        getLocalLock();
-
-        if(_pendingLockNotification != uf) //there can only be one pending notification out at any time - since this lock was woken up - this must have gotten the pending notification
-        {
-            releaseLocalLock();
-            continue;
-        }
-        _pendingLockNotification = 0; 
-
-        //check if any other UF has gotten the lock between the time that this UF 
-        //got the notification and actually acted on it
-        if(!_lockCurrentlyOwned && (_listOfClientsWaitingOnLock.front() == uf))
-        {
-            _lockCurrentlyOwned = true;
-            releaseLocalLock();
-            return true;
-        }
-
-        releaseLocalLock();
-    }
-
-    return true;
-}
-
-inline bool UFMutex::unlock(UF* uf)
+inline void UFMutex::releaseSpinLock(bool spinCPU)
 {
-    if(!uf)
-        return false;
-
-    UFList::iterator beg;
-    getLocalLock();
-
-    beg = _listOfClientsWaitingOnLock.begin();
-    if(uf == *beg) //check if this uf is the current owner of this lock
-    {
-        _lockCurrentlyOwned = false;
-        beg = _listOfClientsWaitingOnLock.erase(beg);
-
-        //notify the next UF in line
-        if(!_listOfClientsWaitingOnLock.empty() && !_pendingLockNotification)
-        {
-            _pendingLockNotification = (*beg);
-            releaseLocalLock();
-            (*beg)->_parentScheduler->addFiberToScheduler((*beg), (*beg)->_parentScheduler->_tid);
-        }
-        else
-            releaseLocalLock();
-
-        return true;
-    }
-
-    releaseLocalLock();
-    return false;
-}
-
-inline bool UFMutex::tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS)
-{
-    while(1)
-    {
-        getLocalLock();
-        if(_listOfClientsWaitingOnLock.empty())
-        {
-            _listOfClientsWaitingOnLock.push_back(uf);
-            _lockCurrentlyOwned = true;
-            releaseLocalLock();
-            return true;
-        }
-
-        releaseLocalLock();
-
-        if(!autoRetryIntervalInUS)
-            break;
-
-        usleep(autoRetryIntervalInUS);
-    }
-
-    return false;
-}
-
-
-
-inline bool UFMutex::condWait(UF* uf)
-{
-    if(!uf)
-        return false;
-    _listOfClientsWaitingOnCond.push_back(uf) ;
-    unlock(uf);
-    uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal or broadcast has occurred
-    lock(uf);
-
-    return true;
-}
-
-inline void UFMutex::broadcast()
-{
-    if(_listOfClientsWaitingOnCond.empty())
-        return;
-
-    //notify all the waiters to wake up
-    UFList::iterator beg = _listOfClientsWaitingOnCond.begin();
-    for(; beg != _listOfClientsWaitingOnCond.end(); )
-    {
-        (*beg)->_parentScheduler->addFiberToScheduler(*beg, (*beg)->_parentScheduler->_tid);
-        beg = _listOfClientsWaitingOnCond.erase(beg);
-    }
-}
-
-inline void UFMutex::signal()
-{
-    if(_listOfClientsWaitingOnCond.empty())
-        return;
-
-    UFList::iterator beg = _listOfClientsWaitingOnCond.begin();
-    (*beg)->_parentScheduler->addFiberToScheduler(*beg, (*beg)->_parentScheduler->_tid);
-    _listOfClientsWaitingOnCond.erase(beg);
+    while(!__sync_bool_compare_and_swap(&_lockActive, 1, 0)) { if(!spinCPU) sched_yield(); }
 }
 
-//TODO: later
-/*
-inline bool UFMutex::condTimedWait(UF* uf)
+inline void UFMutex::getSpinLock(bool spinCPU)
 {
-    if(!uf)
-        return false;
+    while(!__sync_bool_compare_and_swap(&_lockActive, 0, 1)) { if(!spinCPU) sched_yield(); }
 }
-*/
 
 #endif

Added: trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFAres.H?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFAres.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFAres.H Sun May 30 02:07:05 2010
@@ -0,0 +1,121 @@
+#ifndef UF_ARES_H
+#define UF_ARES_H
+
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <sys/epoll.h>
+#include <ares.h>
+#include "UFDNS.H"
+#include "UFHostEnt.H"
+
+// for ares less then version 1.7.0
+//#ifndef ares_addrttl
+//#define ares_addrttl addrttl
+//#define ares_addr6ttl addr6ttl
+//#endif
+
+typedef void (*my_sock_state_cb)(void *data, int s, int read, int write) ;
+
+
+typedef hash_map< const char *, UFHostEnt * , hash<const char*> > HostentHashMap;
+
+class UFAresUFIO : public UFIO
+{
+    public:
+	UFAresUFIO() : myhostent_((UFHostEnt*)0x00000BAD) 
+	{
+	    _uf = UFScheduler::getUF();
+	};
+	
+	~UFAresUFIO()
+	{
+	    cerr<<"aresio destructor called"<<endl;
+	};
+        
+	void set_myhostent(UFHostEnt *h) 
+	{ 
+	    myhostent_ = h; 
+	};
+	
+	void Init(my_sock_state_cb callback, UFAresUFIO *aresio) 
+	{ 
+	    op_.sock_state_cb =callback ; 
+	    op_.sock_state_cb_data = aresio;
+	    ares_init_options(&ch_, &op_,ARES_OPT_SOCK_STATE_CB); 
+	};
+	
+	UFHostEnt *get_myhostent()
+	{
+	    return myhostent_;
+	};
+
+	ares_channel get_channel()
+	{
+	    return ch_;
+	};
+
+	void set_complete(bool flag)
+	{
+	    complete_ = flag;
+	}
+
+	bool get_complete()
+	{
+	    return complete_;
+	}
+       
+	void destroy()
+	{
+	    ares_destroy_options( &op_);
+	    ares_destroy(ch_);
+        }
+
+    private:	
+	ares_channel ch_;
+	struct ares_options op_;
+//	struct hostent *hostent_;
+//	struct ares_addrttl *aresttl_;;
+//	struct nttl_;
+        UFHostEnt *myhostent_;
+	bool complete_;
+
+};
+
+
+
+class UFAres : public UFDNS
+{
+    public:
+	 UFAres()  { };
+	 ~UFAres() { };
+	 unsigned long int GetHostByName(const char *name, uint32_t timeout = 0);
+	 hostent *GetHostByNameDebug(const char *name, uint32_t timeout = 0);
+	 hostent *GetHostByAddr(uint32_t ip , uint32_t timeout = 0, uint32_t family = AF_INET) { return NULL; };
+	 hostent *GetSrvByName(const char *name, uint32_t timeout = 0) {  return NULL; };
+	 UFAresUFIO* GetNewAresUFIO();
+	 void ReleaseAresUFIO( UFAresUFIO* ares);
+	 UFHostEnt* GetCachedHostent(const char* name)
+	 {
+	     //cerr<<"size in get"<<host_map_.size()<<endl;
+	     HostentHashMap::const_iterator index = host_map_.find(name);
+	     if(index == host_map_.end())
+		 return NULL;
+	     else
+		 return const_cast<UFHostEnt*>(index->second);
+	 }
+	 void InsertToCache(const char* name,UFHostEnt *host)
+	 {
+	     //cerr<<"size in inset"<<host_map_.size()<<endl;
+	     host_map_[name] = host;
+	     //cerr<<" after size in inset"<<host_map_.size()<<endl;
+	 }
+
+
+    private:
+	 list<UFAresUFIO*> list_ares_ufio_;
+	 HostentHashMap host_map_;
+};
+
+#endif

Propchange: trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H
------------------------------------------------------------------------------
    svn:mergeinfo = 

Added: trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H Sun May 30 02:07:05 2010
@@ -0,0 +1,35 @@
+#ifndef UF_DNS_H
+#define UF_DNS_H
+
+#include "UF.H"
+#include "UFIO.H"
+
+class UFDNS : public UFIO
+{
+   public:
+       UFDNS()
+       {
+	   _uf = UFScheduler::getUF();
+	   timeout_msec_ = 5000;
+	   tries_ = 4;
+	   port_ = 53;
+	   tcp_=false;
+	  
+       }
+       virtual ~UFDNS() { };
+       
+       virtual struct hostent *GetHostByNameDebug(const char *name, uint32_t timeout = 0) = 0 ;
+       virtual unsigned long int GetHostByName(const char *name, uint32_t timeout = 0) = 0;
+       virtual struct hostent *GetHostByAddr(uint32_t ip , uint32_t timeout = 0, uint32_t family = AF_INET) = 0 ;
+       virtual struct hostent *GetSrvByName(const char *name, uint32_t = 0) = 0 ;
+       
+   protected:
+       uint32_t timeout_msec_;
+       uint32_t tries_;
+       uint32_t port_;
+       bool     tcp_;
+       
+       
+};
+
+#endif

Added: trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFHostEnt.H Sun May 30 02:07:05 2010
@@ -0,0 +1,113 @@
+#ifndef MY_HOSTENT
+#define MY_HOSTENT
+
+#include<ares.h>
+
+// for ares less then version 1.7.0
+//#ifndef ares_addrttl
+//#define ares_addrttl addrttl
+//#define ares_addr6ttl addr6ttl
+//#endif
+
+class  UFHostEnt 
+{
+    public :
+	UFHostEnt()   { aresttl_ = NULL; hostent_ = NULL ; nttl_ = 0 ; timestamp_ = 0; };
+
+	~UFHostEnt () { ReleaseEntries();  };
+
+	hostent* get_hostent()
+	{
+	    return hostent_;
+	}
+
+	void set_hostent(hostent* host)
+	{
+	    hostent_ = host;
+	}
+
+	ares_addrttl* get_aresttl()
+	{
+	    return aresttl_;
+	}
+
+	void set_aresttl(ares_addrttl* aresttl)
+	{
+	    aresttl_ = aresttl;
+	}
+
+	uint32_t get_nttl()
+	{
+	    return nttl_;
+	}
+
+	void set_nttl(uint32_t nttl)
+	{
+	    nttl_ = nttl;
+	}
+
+	double  get_timestamp()
+	{
+	    return timestamp_;
+	}
+
+	void set_timestamp(double  timestamp)
+	{
+	    timestamp_ = timestamp;
+	}
+
+	bool IsEntryExpired( double  current_timestamp, uint32_t ttl)
+	{
+	    if(((timestamp_ + ttl) - current_timestamp) <= 0)
+		return true ; 
+	    
+	    return false;
+	}
+
+	unsigned long int GetUnExpiredEntry(double  current_timestamp)
+	{
+	    for(unsigned int i = 0 ; i < nttl_ ; i ++)
+	    {
+		if(IsEntryExpired(current_timestamp,aresttl_[i].ttl) == false)
+		//if(IsEntryExpired(current_timestamp,1) == false)
+		    return aresttl_[i].ipaddr.s_addr;
+	    }
+	    return 0;
+	}
+
+	unsigned long int GetFirstIP()
+	{
+	   if(aresttl_ !=NULL)
+	   return aresttl_[0].ipaddr.s_addr;
+	   else 
+	       return 0;
+	}
+	void ReleaseEntries()
+	{
+	    if(hostent_ != NULL)
+		ares_free_hostent(hostent_);
+	    if(aresttl_ != NULL)
+		free(aresttl_);
+	}
+
+	void lock(UF * uf)
+	{
+	    mutex_.lock(uf);
+	}
+
+	void unlock(UF *uf)
+	{
+	    mutex_.unlock(uf);
+	}
+
+
+    private:
+        
+        UFMutex mutex_; 
+        struct hostent *hostent_;
+        struct ares_addrttl *aresttl_;
+        uint32_t nttl_;
+        double  timestamp_;
+};
+
+#endif

Copied: trafficserver/traffic/branches/UserFiber/core/include/UFIO.H (from r944753, trafficserver/traffic/branches/UserFiber/include/UFIO.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFIO.H?p2=trafficserver/traffic/branches/UserFiber/core/include/UFIO.H&p1=trafficserver/traffic/branches/UserFiber/include/UFIO.H&r1=944753&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFIO.H Sun May 30 02:07:05 2010
@@ -99,22 +99,20 @@ struct UFIO
     bool close();
 
     bool setFd(int fd, bool makeNonBlocking = true);
-
+    void setUF(UF* uf);
+    void setUFIOScheduler(UFIOScheduler* ufios);
 
     unsigned int getErrno() const;
     int getFd() const;
     UF* getUF() const;
-
     const std::string& getRemoteIP() const;
     unsigned int getRemotePort() const;
-
     UFIOScheduler* getUFIOScheduler() const;
-    void setUFIOScheduler(UFIOScheduler* ufios);
-
-    UFSleepInfo*                _sleepInfo;
 
     static void ufCreateThreadWithIO(pthread_t* tid, std::list<UF*>* ufsToStartWith);
-    static UFIOScheduler* getUFIOS();
+
+    UFSleepInfo*                _sleepInfo;
+    bool                        _markedActive;
 
 protected:
     int                         _fd;
@@ -129,11 +127,11 @@ protected:
     unsigned int                _remotePort;
 
     int                         _lastEpollFlag;
-    bool                        _amtReadLastTimeEqualToAskedAmt;
 };
 inline unsigned int UFIO::getErrno() const { return _errno; }
 inline int UFIO::getFd() const { return _fd; }
 inline UF* UFIO::getUF() const { return _uf; }
+inline void UFIO::setUF(UF* uf) { _uf = uf; }
 inline UFIOScheduler* UFIO::getUFIOScheduler() const { return _ufios; }
 inline void UFIO::setUFIOScheduler(UFIOScheduler* ufios) { _ufios = ufios; }
 inline const std::string& UFIO::getRemoteIP() const { return _remoteIP; };
@@ -144,25 +142,38 @@ inline unsigned int UFIO::getRemotePort(
 struct UFIOScheduler;
 //typedef map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
 typedef hash_map<pthread_t, UFIOScheduler*, hash<uintptr_t> > ThreadFiberIOSchedulerMap;
+
+struct UFConnectionPool;
 struct UFIOScheduler
 {
-    UFIOScheduler() {}
-    virtual ~UFIOScheduler() {}
+    UFIOScheduler();
+    virtual ~UFIOScheduler();
 
     virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0) = 0;
     virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0) = 0;
     virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = 0) = 0;
     virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0) = 0;
     virtual bool closeConnection(UFIO* ufio) = 0;
+    //TODO: support regular poll behavior
+    virtual bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0) = 0;
 
     virtual bool isSetup() { return false; }
     virtual void waitForEvents(TIME_IN_US timeToWait) = 0;
 
-    static ThreadFiberIOSchedulerMap _tfiosscheduler;
+    static UFIOScheduler* getUFIOS(pthread_t tid = 0);
+    static ThreadFiberIOSchedulerMap    _tfiosscheduler;
+    static pthread_key_t                _keyToIdentifySchedulerOnThread;
+
+    // connection pool
+    UFConnectionPool* getConnPool() const;
+
+protected:
+    UFConnectionPool*           _connPool;
 };
+inline UFConnectionPool* UFIOScheduler::getConnPool() const { return _connPool; }
 
 
-#define MAX_FDS_FOR_EPOLL 64*1024-1
+#define MAX_FDS_FOR_EPOLL 128*1024-1
 //typedef map<int, UFIO*> IntUFIOMap;
 typedef hash_map<int, UFIO*, hash<int> > IntUFIOMap;
 typedef std::multimap<TIME_IN_US, UFSleepInfo*> MapTimeUFIO;
@@ -180,6 +191,7 @@ struct EpollUFIOScheduler : public UFIOS
     bool setupForRead(UFIO* ufio, TIME_IN_US to = 0);
     bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0);
     bool closeConnection(UFIO* ufio);
+    bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0);
 
 
     void waitForEvents(TIME_IN_US timeToWait = -1);
@@ -187,19 +199,23 @@ struct EpollUFIOScheduler : public UFIOS
     bool                            _interruptedByEventFd;
 
 protected:
-    UF*                 _uf;
-    int                 _epollFd;
-    unsigned int        _maxFds;
-    struct epoll_event* _epollEventStruct;
-    IntUFIOMap          _intUFIOMap;
-    bool                _alreadySetup;
+    UF*                             _uf;
+    UFScheduler*                    _ufs;
+    int                             _epollFd;
+    unsigned int                    _maxFds;
+    struct epoll_event*             _epollEventStruct;
+    IntUFIOMap                      _intUFIOMap;
+    bool                            _alreadySetup;
 
 
-    MapTimeUFIO         _sleepList;
+    MapTimeUFIO                     _sleepList;
+    unsigned long long int          _earliestWakeUpFromSleep;
 
     bool addToScheduler(UFIO* ufio, 
                         void* inputInfo /*flags to identify how ot add*/, 
-                        TIME_IN_US to = 0);
+                        TIME_IN_US to = 0,
+                        bool wait = true, 
+                        bool runEpollCtl = false);
 
 
     list<UFSleepInfo*>  _availableSleepInfo;

Added: trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFPC.H?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFPC.H (added)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFPC.H Sun May 30 02:07:05 2010
@@ -0,0 +1,132 @@
+#ifndef UFPC_H
+#define UFPC_H
+
+#include <set>
+#include "UF.H"
+
+struct UFMutex;
+struct UFProducerData;
+struct UFConsumer;
+struct UFProducer;
+
+enum UFProducerDataCode
+{
+    ADD = 1,
+    END = 2
+};
+
+struct UFConsumer
+{
+    UFConsumer(bool shouldLockForInternalMods = true);
+    virtual ~UFConsumer();
+    //user needs to call input->releaseMe after consuming the data
+    UFProducerData* waitForData(UF* uf = 0);
+    bool hasData(UF* uf = 0);
+
+    bool joinProducer(UFProducer* ufp);
+    bool removeProducer(UFProducer* ufp);
+
+    std::list<UFProducerData*>      _queueOfDataToConsume;
+    UFMutex                         _queueOfDataToConsumeLock;
+
+protected:
+    std::set<UFProducer*>           _consumersProducerSet;
+    UFMutex                         _consumersProducerSetLock;
+    bool                            _shouldLockForInternalMods;
+};
+
+struct UFProducer
+{
+    friend class UFConsumer;
+    UFProducer();
+    ~UFProducer();
+    bool removeConsumer(UFConsumer* ufc);
+    bool produceData(void* data, size_t size, bool freeDataOnExit = true, UF* uf = 0);
+
+protected:
+    std::set<UFConsumer*>           _producersConsumerSet;
+    UFMutex                         _producersConsumerSetLock; //needed when the consumers are adding or removing themselves from the consumerList
+
+    /*TODO: producer can keep a collection of the data
+    std::list<UFProducerData*>      _producerData;
+    UFMutex                         _producerDataMutex; //this mutex is needed to add to the producerdata
+    */
+    bool addConsumer(UFConsumer* ufc);
+};
+
+struct UFProducerData
+{
+    void*                           _data;
+    size_t                          _size;
+    UFProducer*                     _producerWhichInserted;
+    UFProducerDataCode              _ufpcCode;
+    bool                            _freeDataOnExit;
+
+    void addRef(size_t numToAdd = 1);
+    void reduceRef();
+
+    static UFProducerData* getObj();
+    static void releaseObj(UFProducerData* obj);
+
+
+protected:
+    ~UFProducerData();
+    UFProducerData() { _referenceCount = 0; _freeDataOnExit = true; }
+    UFMutex                         _controlReferenceCount; //control the ref. count of this data
+    size_t                          _referenceCount;
+};
+
+inline bool UFProducer::addConsumer(UFConsumer* ufc)
+{
+    _producersConsumerSetLock.getSpinLock();
+    _producersConsumerSet.insert(ufc); //check insertion
+    _producersConsumerSetLock.releaseSpinLock();
+    return true;
+}
+
+inline bool UFProducer::removeConsumer(UFConsumer* ufc)
+{
+    _producersConsumerSetLock.getSpinLock();
+    _producersConsumerSet.erase(ufc);
+    _producersConsumerSetLock.releaseSpinLock();
+    return true;
+}
+
+inline void UFProducerData::addRef(size_t numToAdd)
+{
+    _controlReferenceCount.getSpinLock();
+    _referenceCount = numToAdd;
+    _controlReferenceCount.releaseSpinLock();
+}
+
+inline void UFProducerData::reduceRef()
+{
+    _controlReferenceCount.getSpinLock();
+    --_referenceCount;
+    _controlReferenceCount.releaseSpinLock();
+}
+
+inline UFProducerData* UFProducerData::getObj()
+{
+    return new UFProducerData();
+}
+
+inline void UFProducerData::releaseObj(UFProducerData* obj)
+{
+    if(!obj)
+        return;
+
+    obj->_controlReferenceCount.getSpinLock();
+    if(!--obj->_referenceCount)
+        delete obj;
+    else
+        obj->_controlReferenceCount.releaseSpinLock();
+}
+
+inline UFProducerData::~UFProducerData()
+{
+    if(_freeDataOnExit && _data)
+        free (_data);
+}
+
+#endif

Copied: trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H (from r944753, trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H?p2=trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H&p1=trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H&r1=944753&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H Sun May 30 02:07:05 2010
@@ -13,6 +13,13 @@
 #include <sstream>
 #include "UF.H"
 
+struct Stat
+{
+    Stat(std::string _name, long long _value);
+    std::string name;
+    long long value;
+};
+
 class UFIO;
 class UFServer;
 class UFStatSystem
@@ -45,7 +52,7 @@ private:
     static bool getStatNum(const char *stat_name, uint32_t &stat_num);
     static UFServer *server;
     static std::map<std::string, uint32_t> stat_name_to_num;
-    static std::vector< std::pair<std::string, long long> > global_stats;
+    static std::vector< Stat > global_stats;
     static uint32_t MAX_STATS_ALLOWED;
     static uint32_t NUM_STATS_ESTIMATE;
 

Propchange: trafficserver/traffic/branches/UserFiber/core/src/
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: trafficserver/traffic/branches/UserFiber/core/src/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/Makefile?rev=949458&r1=944753&r2=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/Makefile Sun May 30 02:07:05 2010
@@ -1,5 +1,16 @@
-CXXFLAGS=-g -O3 -Wall -Wno-deprecated -march=x86-64
-CPPFLAGS=-DPIPE_NOT_EFD -I../include
+ARES_VERSION = 1.7.1
+ARES = c-ares-$(ARES_VERSION)
+ARESDIR=../ext
+ARES_SRC = $(ARESDIR)/$(ARES)/ares.h
+ARES_SRC_FILE = $(ARES).tar.gz
+ARES_GPG = $(ARES).tar.gz.asc
+ARES_LIB = $(ARESDIR)/$(ARES)/.libs/libcares.a 
+
+
+CXXFLAGS=-g -O3 -Wall -Werror -Wno-deprecated -march=x86-64
+#CXXFLAGS=-g -Wall -Werror -Wno-deprecated -march=x86-64 -fno-inline
+#CPPFLAGS=-DLOCK_DEBUG -DPIPE_NOT_EFD -I../include -I$(ARESDIR)/$(ARES)
+CPPFLAGS=-DPIPE_NOT_EFD -I../include -I$(ARESDIR)/$(ARES)
 ARFLAGS=-rv
 RANLIB=ranlib
 INCLUDE_DIR=../include
@@ -7,28 +18,47 @@ LIB_DIR=../lib
 
 .PHONY: all clean
 
-all: libUF.a
+all: $(LIB_DIR)/libUF.a
+
+$(ARES_LIB): $(ARES_SRC)
+	pushd $(ARESDIR)/$(ARES) && ./configure && gmake && popd
 
-UF.o: UF.C $(INCLUDE_DIR)/UF.H
+$(ARES_SRC):
+	if [ ! -d $(ARESDIR)/$(ARES) ] ; then \
+	   pushd $(ARESDIR) && \
+	   wget http://c-ares.haxx.se/$(ARES_SRC_FILE) && \
+	   tar xzf ./$(ARES_SRC_FILE) && popd; \
+	fi
+
+$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UF.o UF.C
 
-UFIO.o: UFIO.C $(INCLUDE_DIR)/UFIO.H
+$(LIB_DIR)/UFPC.o: UFPC.C $(INCLUDE_DIR)/UFPC.H $(LIB_DIR)/UF.o
+	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFPC.o UFPC.C
+
+$(LIB_DIR)/UFConnectionPoolImpl.o: UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
+	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConnectionPoolImpl.o UFConnectionPoolImpl.C
+
+$(LIB_DIR)/UFIO.o: UFIO.C $(INCLUDE_DIR)/UFIO.H $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFIO.o UFIO.C
 
-UFStatSystem.o: UFStatSystem.C $(INCLUDE_DIR)/UFStatSystem.H
+$(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_DIR)/UFAres.H $(INCLUDE_DIR)/UFDNS.H $(INCLUDE_DIR)/UFHostEnt.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o $(ARES_SRC)
+	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFAres.o UFAres.C
+
+$(LIB_DIR)/UFStatSystem.o: UFStatSystem.C $(INCLUDE_DIR)/UFStatSystem.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFStatSystem.o UFStatSystem.C
 
-UFServer.o: UFServer.C $(INCLUDE_DIR)/UFServer.H
+$(LIB_DIR)/UFServer.o: UFServer.C $(INCLUDE_DIR)/UFServer.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFConnectionPoolImpl.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFServer.o UFServer.C
 
-UFConnectionPoolImpl.o: UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H
-	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConnectionPoolImpl.o UFConnectionPoolImpl.C
-
-libUF.a: UF.o UFIO.o UFStatSystem.o UFServer.o UFConnectionPoolImpl.o
-	$(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a \
-    $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFConnectionPoolImpl.o
+$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFServer.o \
+	$(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
+	$(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $(ARES_LIB) \
+	$(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFServer.o \
+	$(LIB_DIR)/UFConnectionPoolImpl.o $(LIB_DIR)/UFAres.o $(LIB_DIR)/UFPC.o
 	$(RANLIB) $(LIB_DIR)/libUF.a
 
 clean: 
+	echo "cleaning ..."
 	$(RM) $(LIB_DIR)/*.o $(LIB_DIR)/*.a
 

Copied: trafficserver/traffic/branches/UserFiber/core/src/UF.C (from r944754, trafficserver/traffic/branches/UserFiber/src/UF.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UF.C?p2=trafficserver/traffic/branches/UserFiber/core/src/UF.C&p1=trafficserver/traffic/branches/UserFiber/src/UF.C&r1=944754&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UF.C Sun May 30 02:07:05 2010
@@ -109,11 +109,11 @@ static pthread_key_t getThreadKey()
 pthread_key_t UFScheduler::_specific_key = getThreadKey();
 UFScheduler::UFScheduler()
 {
+    _activeRunningListSize = 0;
     _earliestWakeUpFromSleep = 0;
     _exitJustMe = false;
     _specific = 0;
     _currentFiber = 0;
-    _conn_pool = new UFConnectionPool;
 
     if(_inThreadedMode)
     {
@@ -159,7 +159,6 @@ UFScheduler::UFScheduler()
 UFScheduler::~UFScheduler()
 {
     //pthread_key_delete(_specific_key);
-    delete _conn_pool;
 }
 
 
@@ -181,6 +180,7 @@ bool UFScheduler::addFibersToScheduler(c
     if(ufList.empty())
         return true;
 
+    UF* uf = 0;
     list<UF*>::const_iterator beg = ufList.begin();
     list<UF*>::const_iterator ending = ufList.end();
     //adding to the same scheduler and as a result thread as the current job
@@ -188,17 +188,23 @@ bool UFScheduler::addFibersToScheduler(c
     {
         for(; beg != ending; ++beg)
         {
-            UF* uf = *beg;
+            uf = *beg;
+            if(uf->_status == WAITING_TO_RUN) //UF is already in the queue
+                continue;
             uf->_status = WAITING_TO_RUN;
             if(uf->_parentScheduler) //probably putting back an existing uf into the active list
             {
-                if(uf->_parentScheduler != this) //cant schedule for some other thread
+                if(uf->_parentScheduler == this) //check that we're scheduling for the same thread
+                {
+                    _activeRunningList.push_back(uf); ++_activeRunningListSize;
+                    continue;
+                }
+                else
                 {
-                    cerr<<"uf is not part of this scheduler"<<endl;
+                    cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->_parentScheduler<<endl;
+                    abort(); //TODO: remove the abort
                     return false;
                 }
-                _activeRunningList.push_back(uf);
-                continue;
             }
 
             //create a new context
@@ -213,7 +219,7 @@ bool UFScheduler::addFibersToScheduler(c
                 cerr<<"error while trying to run makecontext"<<endl;
                 return false;
             }
-            _activeRunningList.push_back(uf);
+            _activeRunningList.push_back(uf); ++_activeRunningListSize;
         }
     }
     else //adding to some other threads' scheduler
@@ -232,7 +238,10 @@ bool UFScheduler::addFibersToScheduler(c
         UFScheduler* ufs = index->second;
         pthread_mutex_lock(&(ufs->_mutexToNominateToActiveList));
         for(; beg != ending; ++beg)
-            ufs->_nominateToAddToActiveRunningList.push_back(*beg);
+        {
+            uf = *beg;
+            ufs->_nominateToAddToActiveRunningList.push_back(uf);
+        }
         pthread_cond_signal(&(ufs->_condToNominateToActiveList));
         pthread_mutex_unlock(&(ufs->_mutexToNominateToActiveList));
         ufs->notifyUF();
@@ -263,12 +272,15 @@ void UFScheduler::runScheduler()
     gettimeofday(&start, 0);
     unsigned long long int timeNow = 0;
 
-    while(!_exitJustMe && !_exit)
+    UFList::iterator ufBeg;
+    UFList::iterator nBeg;
+    MapTimeUF::iterator slBeg;
+    bool waiting = false;
+    while(!shouldExit())
     {
-        UFList::iterator beg = _activeRunningList.begin();
-        for(; beg != _activeRunningList.end(); )
+        for(ufBeg = _activeRunningList.begin(); ufBeg != _activeRunningList.end(); )
         {
-            UF* uf = *beg;
+            UF* uf = *ufBeg;
             _currentFiber = uf;
             uf->_status = RUNNING;
             swapcontext(&_mainContext, &(uf->_UFContext));
@@ -277,18 +289,18 @@ void UFScheduler::runScheduler()
             if(uf->_status == RUNNING) { }
             else if(uf->_status == BLOCKED)
             {
-                beg = _activeRunningList.erase(beg);
+                ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
                 continue;
             }
             else if(uf->_status == COMPLETED) 
             {
                 delete uf;
-                beg = _activeRunningList.erase(beg);
+                ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
                 continue;
             }
 
             uf->_status = WAITING_TO_RUN;
-            ++beg;
+            ++ufBeg;
         }
 
 
@@ -309,15 +321,18 @@ void UFScheduler::runScheduler()
             //TODO: do atomic comparison to see if there is anything in 
             //_nominateToAddToActiveRunningList before getting the lock
             pthread_mutex_lock(&_mutexToNominateToActiveList);
-            UFList::iterator beg = _nominateToAddToActiveRunningList.begin();
-            for(; beg != _nominateToAddToActiveRunningList.end(); )
+            for(nBeg = _nominateToAddToActiveRunningList.begin();
+                nBeg != _nominateToAddToActiveRunningList.end(); )
             {
-                UF* uf = *beg;
+                UF* uf = *nBeg;
                 if(uf->_parentScheduler)
-                    _activeRunningList.push_back(uf);
+                {
+                    uf->_status = WAITING_TO_RUN;
+                    _activeRunningList.push_front(uf); ++_activeRunningListSize;
+                }
                 else //adding a new fiber
                     addFiberToScheduler(uf, 0);
-                beg = _nominateToAddToActiveRunningList.erase(beg);
+                nBeg = _nominateToAddToActiveRunningList.erase(nBeg);
             }
 
             pthread_mutex_unlock(&_mutexToNominateToActiveList);
@@ -326,38 +341,52 @@ void UFScheduler::runScheduler()
 
         //pick up the fibers that may have completed sleeping
         //look into the sleep list;
+        //printf("%u %u tnc = %llu %llu\n", (unsigned int)pthread_self(), _sleepList.size(), _earliestWakeUpFromSleep, _earliestWakeUpFromSleep-timeNow);
         if(!_sleepList.empty())
         {
             gettimeofday(&now, 0);
             ranGetTimeOfDay = true;
-            timeNow = (now.tv_sec*1000000)+now.tv_usec;
+            timeNow = timeInUS(now);
             if(timeNow >= _earliestWakeUpFromSleep) //dont go into this queue unless the time seen the last time has passed
             {
-                for(MapTimeUF::iterator beg = _sleepList.begin(); beg != _sleepList.end(); )
+                for(slBeg = _sleepList.begin(); slBeg != _sleepList.end(); )
                 {
-                    //TODO: has to be cleaned up
                     //1. see if anyone has crossed the sleep timer - add them to the active list
-                    if(beg->first <= timeNow) //sleep time is over
+                    if(slBeg->first <= timeNow) //sleep time is over
                     {
-                        _activeRunningList.push_back(beg->second);
-                        _sleepList.erase(beg);
-                        beg = _sleepList.begin();
+                        UFWaitInfo *ufwi = slBeg->second;
+                        ufwi->_ctrl.getSpinLock();
+                        ufwi->_sleeping = false;
+                        if(ufwi->_uf)
+                        {
+                            ufwi->_uf->_status = WAITING_TO_RUN;
+                            _activeRunningList.push_front(ufwi->_uf); ++_activeRunningListSize;
+                            ufwi->_uf = NULL;
+                        }
+                        waiting = ufwi->_waiting;
+                        ufwi->_ctrl.releaseSpinLock();
+                        if(!waiting) //since the uf is not being waited upon release it (the sleeping part has already been done)
+                            releaseWaitInfo(*ufwi);
+                        
+                        _sleepList.erase(slBeg);
+                        slBeg = _sleepList.begin();
+                        
                         continue;
                     }
                     else
                     {
                         if(_amtToSleep) //since the nominate system might have turned off the sleep - we dont activate it again
-                            _amtToSleep = beg->first-timeNow; 
-                        _earliestWakeUpFromSleep = beg->first;
+                            _amtToSleep = slBeg->first-timeNow; 
+                        _earliestWakeUpFromSleep = slBeg->first;
                         break;
                     }
-                    ++beg;
+                    ++slBeg;
                 }
             }
         }
 
         //see if there is anything to do or is it just sleeping time now
-        if(!_notifyFunc && _activeRunningList.empty() && !_exit)
+        if(!_notifyFunc && !_activeRunningListSize && !shouldExit())
         {
             if(_inThreadedMode) //go to conditional wait (in threaded mode)
             {
@@ -436,6 +465,305 @@ int UFFactory::registerFunc(UF* uf)
     return _size++;
 }
 
+const unsigned int CONSECUTIVE_LOCK_FAILURES_ALLOWED = 3;
+bool UFMutex::lock(UF* uf)
+{
+    if(!uf || !uf->_parentScheduler)
+        return false;
+
+    getSpinLock();
+    if(_listOfClientsWaitingOnLock.empty()) //probably the most common case (no UF has the lock)
+    {
+#ifdef LOCK_DEBUG
+        printf("%lu l1\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+        _listOfClientsWaitingOnLock.push_back(uf);
+        _lockCurrentlyOwned = true;
+        releaseSpinLock();
+        return true;
+    }
+
+    //see if any UF is holding the lock right now - if not get the lock
+    //this is the case where between the time that an UF is woken up
+    //(after another UF releases the lock)
+    //and it actually runs this requesting UF might be able to procure the lock
+    //if there is a mustRunUF - that UF has to run first - and this UF has to go to the end of the line
+    if(!_lockCurrentlyOwned && !_mustRunUF)
+    {
+#ifdef LOCK_DEBUG
+        printf("%lu l2\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+        _listOfClientsWaitingOnLock.push_front(uf);
+        _lockCurrentlyOwned = true;
+        releaseSpinLock();
+        return true;
+    }
+
+    //for the rest of the UFs that didnt meet the above criteria 
+    //and didnt get the lock they have to wait
+    _listOfClientsWaitingOnLock.push_back(uf);
+    releaseSpinLock();
+
+    unsigned short int counter = 0;
+    while(1) //try to get the lock
+    {
+#ifdef LOCK_DEBUG
+        printf("%lu wt\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+        //simply yield - since the uf will be woken up once it gets the lock
+        uf->waitOnLock();
+
+        //since this uf got woken up - check if it can get the lock now
+        getSpinLock();
+
+        //check if any other UF has gotten the lock between the time that this UF 
+        //got the notification and actually acted on it
+        if(!_lockCurrentlyOwned && (_listOfClientsWaitingOnLock.front() == uf))
+        {
+#ifdef LOCK_DEBUG
+            printf("%lu l3\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+            _lockCurrentlyOwned = true;
+            _mustRunUF = 0;
+            releaseSpinLock();
+            return true;
+        }
+
+        if(++counter >= CONSECUTIVE_LOCK_FAILURES_ALLOWED) //dont let a UF fail to get the lock more than CONSECUTIVE_LOCK_FAILURES_ALLOWED times
+            _mustRunUF = uf;
+        releaseSpinLock();
+    }
+
+    return true;
+}
+
+bool UFMutex::unlock(UF* uf)
+{
+    if(!uf)
+        return false;
+
+    UFList::iterator beg;
+    getSpinLock();
+
+    beg = _listOfClientsWaitingOnLock.begin();
+    if(uf == *beg) //check if this uf is the current owner of this lock
+    {
+        _lockCurrentlyOwned = false;
+        beg = _listOfClientsWaitingOnLock.erase(beg);
+#ifdef LOCK_DEBUG
+        printf("%lu u %d\n", (unsigned long int) ((uintptr_t)(void*)uf), _listOfClientsWaitingOnLock.size());
+#endif
+
+        bool releasedLock = false;
+        //notify the next UF in line
+        while(!_listOfClientsWaitingOnLock.empty())
+        {
+            UF* tmpUf = *beg;
+            if(!tmpUf || !tmpUf->_parentScheduler) //invalid tmpuf - cant wake it up
+            {
+#ifdef LOCK_DEBUG
+                printf("%lu nf1\n", (unsigned long int) ((uintptr_t)(void*)uf));
+#endif
+                beg = _listOfClientsWaitingOnLock.erase(beg);
+                if(beg == _listOfClientsWaitingOnLock.end())
+                    break;
+                continue;
+            }
+            /*
+            if(tmpUf->getStatus() == WAITING_TO_RUN) //this uf has already been put into the waiting to run list
+                break;
+                */
+
+
+#ifdef LOCK_DEBUG
+            printf("%lu wk %lu\n", 
+                   (unsigned long int) ((uintptr_t)(void*)uf), 
+                   (unsigned long int) ((uintptr_t)(void*)tmpUf));
+#endif
+
+            releaseSpinLock();
+            releasedLock = true;
+            uf->_parentScheduler->addFiberToScheduler(tmpUf, tmpUf->_parentScheduler->_tid);
+            break;
+        }
+
+        if(!releasedLock)
+            releaseSpinLock();
+
+        return true;
+    }
+    else
+    {
+        cerr<<uf<<" tried to unlock but was not in top of list"<<endl;
+        abort();
+    }
+
+    releaseSpinLock();
+    return false;
+}
+
+bool UFMutex::tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS)
+{
+    while(1)
+    {
+        getSpinLock();
+        if(_listOfClientsWaitingOnLock.empty())
+        {
+            _listOfClientsWaitingOnLock.push_back(uf);
+            _lockCurrentlyOwned = true;
+            releaseSpinLock();
+            return true;
+        }
+
+        releaseSpinLock();
+
+        if(!autoRetryIntervalInUS)
+            break;
+
+        usleep(autoRetryIntervalInUS);
+    }
+
+    return false;
+}
+
+
+bool UFMutex::condWait(UF* uf)
+{
+    if(!uf)
+        return false;
+    
+    //the object is already in the hash
+    if(_listOfClientsWaitingOnCond.find(uf) == _listOfClientsWaitingOnCond.end())
+    {
+        UFWaitInfo *ufwi = uf->_parentScheduler->getWaitInfo();
+        ufwi->_uf = uf;
+        ufwi->_waiting = true;
+    
+        _listOfClientsWaitingOnCond[uf] = ufwi;
+    }
+
+    unlock(uf);
+    uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal or broadcast has occurred
+    lock(uf);
+
+    return true;
+}
+
+void UFMutex::broadcast()
+{
+    if(_listOfClientsWaitingOnCond.empty())
+        return;
+
+    UFScheduler* ufs = UFScheduler::getUFScheduler();
+    if(!ufs)
+    {
+        cerr<<"couldnt get scheduler on thread "<<pthread_self()<<endl;
+        return;
+    }
+
+    //notify all the UFs waiting to wake up
+    bool sleeping = false;
+    for(UFWLHash::iterator beg = _listOfClientsWaitingOnCond.begin();
+        beg != _listOfClientsWaitingOnCond.end(); ++beg)
+    {
+        // Get WaitInfo object
+        UFWaitInfo *ufwi = beg->second;
+
+        ufwi->_ctrl.getSpinLock();
+        ufwi->_waiting = false; // Set _waiting to false, indicating that the UFWI has been removed from the cond queue
+
+        // If uf is not NULL, schedule it and make sure no one else can schedule it again
+        if(ufwi->_uf) 
+        {
+            ufs->addFiberToScheduler(ufwi->_uf, ufwi->_uf->_parentScheduler->_tid);
+            ufwi->_uf = NULL;
+        }
+        
+        sleeping = ufwi->_sleeping;
+        ufwi->_ctrl.releaseSpinLock();
+        if(!sleeping) //sleep list has already run
+            ufs->releaseWaitInfo(*ufwi);
+    }
+    _listOfClientsWaitingOnCond.clear();
+}
+
+void UFMutex::signal()
+{
+    if(_listOfClientsWaitingOnCond.empty())
+        return;
+
+    UFScheduler* ufs = UFScheduler::getUFScheduler();
+    if(!ufs)
+    {
+        cerr<<"couldnt get scheduler"<<endl;
+        return;
+    }
+    UF *uf_to_signal = NULL;
+    bool sleeping = false;
+    for(UFWLHash::iterator beg = _listOfClientsWaitingOnCond.begin(); beg != _listOfClientsWaitingOnCond.end();)
+    {
+        // Take first client off list
+        UFWaitInfo *ufwi = beg->second;
+
+        ufwi->_ctrl.getSpinLock();
+        ufwi->_waiting = false; // Set _waiting to false, indicating that the UFWI has been removed from the cond queue
+        
+        if(ufwi->_uf)
+        {
+            uf_to_signal = ufwi->_uf; // Store UF to signal
+            ufwi->_uf = NULL; // Clear UF. This ensures that no one else can schedule the UF.
+        }
+
+        sleeping = ufwi->_sleeping;
+        ufwi->_ctrl.releaseSpinLock();
+        if(!sleeping) //sleep list has already run
+            ufs->releaseWaitInfo(*ufwi);
+
+        // If a UF was found to signal, break out
+        _listOfClientsWaitingOnCond.erase(beg);
+        if(uf_to_signal)
+            break;
+        beg = _listOfClientsWaitingOnCond.begin();
+    }
+
+    if(uf_to_signal)
+        ufs->addFiberToScheduler(uf_to_signal, uf_to_signal->_parentScheduler->_tid);
+}
+
+int UFMutex::condTimedWait(UF* uf, unsigned long long int sleepAmtInUs)
+{
+    bool result = false;
+    if(!uf)
+        return result;
+
+    // Wrap uf in UFWait structure before pushing to wait and sleep queues
+    
+    UFWaitInfo *ufwi = UFScheduler::getUFScheduler()->getWaitInfo();
+    ufwi->_uf = uf;
+    ufwi->_waiting = true;
+    ufwi->_sleeping = true;
+    
+    // Add to waiting queue
+    _listOfClientsWaitingOnCond[uf] = ufwi;
+    unlock(uf);
+    
+    // Add to sleep queue
+    struct timeval now;
+    gettimeofday(&now, 0);
+    unsigned long long int timeNow = timeInUS(now);
+    ufwi->_sleeping = true;
+    uf->_parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), ufwi));
+    
+    uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal, broadcast or timeout has occurred
+
+    ufwi->_ctrl.getSpinLock();
+    result = ufwi->_sleeping;
+    ufwi->_ctrl.releaseSpinLock();
+
+    lock(uf);
+    return (result) ? true : false;//if result (ufwi->_sleeping) is not true, it must be that the sleep list activated this uf
+}
+
 void* setupThread(void* args)
 {
     if(!args)

Added: trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFAres.C?rev=949458&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFAres.C (added)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFAres.C Sun May 30 02:07:05 2010
@@ -0,0 +1,217 @@
+#include "UFAres.H"
+#include <stdio.h>
+
+static void printHost(struct hostent* host , struct ares_addrttl *ttls = 0, int nttl = 0)
+{
+    return;
+        int i;
+	for(i = 0 ;  host->h_addr_list[i] != NULL; i++)
+	{
+	   printf("%d.%d.%d.%d\n",(unsigned char)host->h_addr_list[i][0],(unsigned char)host->h_addr_list[i][1],(unsigned char)host->h_addr_list[i][2],(unsigned char)host->h_addr_list[i][3]);
+	}
+	for(i = 0; host->h_aliases[i]!=NULL;i++)
+	   printf("%s\n",host->h_aliases[i]);
+
+        for(i = 0 ; i < nttl ; i++)
+	{
+	   char *name = inet_ntoa(ttls[i].ipaddr);
+	   printf("IP is %s, TTL is %d\n",name,ttls[i].ttl);
+	}
+	printf("----------------------------------------------------------------------------------\n");
+}
+
+static double GetCurrentTime()
+{
+    timeval now;
+    gettimeofday(&now,NULL);
+    return((double)(now.tv_sec +(now.tv_usec/1000000.0)));
+}
+static void mycallback(void *arg, int status,
+	               int timeouts, struct hostent *host)
+{
+	UFAresUFIO *ares = (UFAresUFIO*) arg;
+        if(!host)
+	{
+	    ares->set_myhostent(NULL);
+	    return;
+	}
+
+	printHost(host);
+        UFHostEnt *myhostent = ares->get_myhostent();
+	myhostent->set_hostent(host);
+	myhostent->set_nttl(0);
+	myhostent->set_aresttl(NULL);
+	myhostent->set_timestamp(0);
+	ares->set_myhostent(myhostent);
+}
+
+static void arescallback(void *arg, int status, int  timeouts, unsigned char *abuf, int alen)
+{
+    UFAresUFIO *ares = (UFAresUFIO*) arg;
+    struct hostent *host = NULL;
+    struct ares_addrttl *ttls = (ares_addrttl*) malloc(sizeof(ares_addrttl) ); ;
+    int nttl = 1;
+    UFHostEnt *myhostent;
+    if(status == ARES_SUCCESS)
+    {
+	 status = ares_parse_a_reply(abuf, alen, &host, ttls,&nttl);
+	if(!host)
+	{
+	    ares->set_myhostent(NULL);
+	    return;
+	}
+	printHost(host, ttls,nttl);
+        myhostent = ares->get_myhostent();	
+	myhostent->set_hostent(host);
+	if(nttl>0)
+	    myhostent->set_nttl(nttl);
+	else
+	    myhostent->set_nttl(0);
+	myhostent->set_aresttl(ttls);
+	myhostent->set_timestamp(GetCurrentTime());
+	ares->set_complete(true);
+    }
+    else
+	cerr<<"failure"<<endl;
+
+
+}
+static void mycall(void *data, int fd, int read, int write)
+{
+	UFAresUFIO *ufio = (UFAresUFIO*) data;
+	if(read ==0  && write == 0)
+	{
+	    ufio->setFd(-1);
+	}
+	else
+	    ufio->setFd(fd);
+	if(read)
+	{  
+	    UFIOScheduler::getUFIOS()->setupForWrite(ufio);
+	}
+}
+
+UFAresUFIO* UFAres::GetNewAresUFIO()
+{
+    UFAresUFIO *aresio;
+    if( list_ares_ufio_.empty() == true)
+    {
+	aresio = new UFAresUFIO();
+    }
+    else
+    {
+	aresio = list_ares_ufio_.front();
+	aresio->setFd(-1);
+        aresio->set_myhostent((UFHostEnt*)0x00000BAD);
+	list_ares_ufio_.pop_front();
+	aresio->setUF(UFScheduler::getUF());
+    }
+    return aresio;
+}
+
+void UFAres::ReleaseAresUFIO(UFAresUFIO *aresio)
+{
+    if(aresio)
+    {
+	aresio->set_complete(false);
+	aresio->destroy();
+	list_ares_ufio_.push_back(aresio);
+    }
+    else
+	cerr<<"did not get valid UFAresUFIO"<<endl;
+
+}
+
+
+unsigned long int  UFAres::GetHostByName(const char *name, uint32_t timeout)
+{
+    UFHostEnt* myhostent_ = GetCachedHostent(name);
+    if(myhostent_ == NULL)
+    {
+	myhostent_ = new UFHostEnt;
+	InsertToCache(name,myhostent_);
+    }
+    myhostent_->lock(UFScheduler::getUF());
+    unsigned long int ip = 0;
+    if(myhostent_ != NULL)
+    {
+	ip = myhostent_->GetUnExpiredEntry(GetCurrentTime());
+    }
+    if(ip != 0)
+    {
+            myhostent_->unlock(UFScheduler::getUF());
+	    return ip;
+    }
+    else
+    {
+	 myhostent_->ReleaseEntries();
+
+	UFAresUFIO *aresio = GetNewAresUFIO();
+	aresio->Init(mycall,aresio);
+	aresio->set_myhostent(myhostent_);
+	
+	ares_search(aresio->get_channel(),name,1, 1,arescallback,aresio);
+	 // process fd did not call mycallback yet 
+
+	while ((aresio->get_complete() == false) && (aresio->getFd()!= -1))
+	{
+	    UFIOScheduler::getUFIOS()->setupForRead(aresio);
+	        // woken up by epoll again, process them 
+            ares_process_fd(aresio->get_channel(), aresio->getFd(), aresio->getFd());
+	}
+
+    
+        // out of loop , either we got hostent or timed out
+        myhostent_ = aresio->get_myhostent(); 
+        ReleaseAresUFIO(aresio);
+        myhostent_->unlock(UFScheduler::getUF());
+        return myhostent_->GetFirstIP();
+	
+    }
+}
+
+struct hostent* UFAres::GetHostByNameDebug(const char *name, uint32_t timeout )
+{
+    UFAresUFIO *aresio = GetNewAresUFIO();
+    aresio->Init(mycall,aresio);
+    ares_gethostbyname(aresio->get_channel(),name,AF_INET,mycallback,aresio);  
+    UFHostEnt *myhostent_ = aresio->get_myhostent(); 
+    if(myhostent_ == (UFHostEnt*)0x00000BAD && aresio->getFd() != -1)
+    {
+	UFIOScheduler::getUFIOS()->setupForRead(aresio);
+    }
+    else
+    {
+	return myhostent_->get_hostent();
+    }
+    // First time no need to block , blocked by setup for read or write
+    ares_process_fd(aresio->get_channel(), aresio->getFd(), aresio->getFd());
+    
+    //process fd called mycallback and hostent is not null
+    myhostent_ = aresio->get_myhostent(); 
+    ReleaseAresUFIO(aresio);
+    if(myhostent_ && myhostent_ != (UFHostEnt*)0x00000BAD) 
+    {
+	return myhostent_->get_hostent();
+    }
+    else 
+    {
+	// process fd did not call mycallback yet 
+	while (aresio->get_myhostent() == (UFHostEnt*)0x00000BAD)
+	{
+            UFIOScheduler::getUFIOS()->setupForRead(aresio);
+	    // woken up by epoll again 
+            ares_process_fd(aresio->get_channel(), aresio->getFd(), aresio->getFd());
+	}
+
+    }
+    
+    // out of loop , either we got hostent or timed out
+    myhostent_ = aresio->get_myhostent(); 
+    ReleaseAresUFIO(aresio);
+    return myhostent_->get_hostent();
+	
+}
+
+   
+

Copied: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C (from r944754, trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C?p2=trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C&p1=trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C&r1=944754&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFConnectionPoolImpl.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C Sun May 30 02:07:05 2010
@@ -9,11 +9,13 @@
 #include <stdio.h>
 #include <errno.h>
 
+#include <string.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <netdb.h>
+#include <string.h>
 
 const unsigned short int PERCENT_LOGGING_SAMPLING = 5;
 
@@ -182,10 +184,7 @@ UFConnectionGroupInfo* UFConnectionPoolI
 {
     GroupIPMap::iterator foundItr = _groupIpMap.find(name);
     if(foundItr == _groupIpMap.end())
-    {
-        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"didnt find group with name "<<name <<" to remove"<<endl;
         return NULL;
-    }
 
     UFConnectionGroupInfo* removedObj = (*foundItr).second;
     _groupIpMap.erase(foundItr);
@@ -230,7 +229,6 @@ UFIO* UFConnectionPoolImpl::getConnectio
     UFConnectionGroupInfo* groupInfo = NULL;
     if((foundItr == _groupIpMap.end()) || !((*foundItr).second))
     {
-        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"null group or didnt find group with name "<<groupName<<endl;
         groupInfo = addGroupImplicit(groupName);
         if(!groupInfo) 
             return NULL;
@@ -315,7 +313,8 @@ UFIO* UFConnectionPoolImpl::getConnectio
             }
             else {
                 // if _maxSimultaneousConns is hit, wait for a connection to become available
-                if(ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns) {
+                if(ipInfo->_maxSimultaneousConns && 
+                   (ipInfo->_currentlyUsedConnections.size() + ipInfo->_inProcessCount >= (unsigned int) ipInfo->_maxSimultaneousConns)) {
                     // wait for a connection to be released
                     ipInfo->_someConnectionAvailable.lock(this_user_fiber);
                     ipInfo->_someConnectionAvailable.condWait(this_user_fiber);
@@ -401,7 +400,6 @@ void UFConnectionPoolImpl::clearBadConne
 
 void UFConnectionPoolImpl::releaseConnection(UFIO* ufIO, bool connOk)
 {
-    cerr << "UFConnectionPoolImpl::releaseConnection" << endl;
     if(!ufIO)
         return;
 
@@ -538,10 +536,7 @@ double UFConnectionPoolImpl::getGroupAva
 
     GroupIPMap::const_iterator foundItr = _groupIpMap.find(name);
     if((foundItr == _groupIpMap.end()) || !((*foundItr).second))
-    {
-        cerr<<getpid()<<" "<<time(NULL)<<" "<<__LINE__<<" "<<"null group or didnt find group with name "<<((*foundItr).second ? (*foundItr).second->_name : "")<<endl;
         return result;
-    }
 
     return (*foundItr).second->getAvailability();
 }
@@ -555,16 +550,21 @@ UFConnectionPoolImpl::~UFConnectionPoolI
 
 void UFConnectionPoolCleaner::run()
 {
-    UFScheduler *this_thread_scheduler = UFScheduler::getUFScheduler();
-    UF* this_uf = this_thread_scheduler->getRunningFiberOnThisThread();
+    UF* this_uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    UFIOScheduler* ufios = UFIOScheduler::getUFIOS();
+    UFConnectionPool* ufcp = ufios->getConnPool();
+
+    if(!ufcp)
+    {
+        cerr<<"couldnt get conn pool on thread "<<pthread_self()<<endl;
+        exit(1);
+    }
+
     while(1)
     {
         this_uf->usleep(300*1000*1000);
-        if(!this_thread_scheduler->_conn_pool)
-            break;
-        this_thread_scheduler->_conn_pool->clearBadConnections();
+        ufcp->clearBadConnections();
     }
-    this_thread_scheduler->setExit();
 }
 
 UFConnectionPoolImpl::UFConnectionPoolImpl()
@@ -582,7 +582,7 @@ void UFConnectionPoolImpl::init()
     }
 }
 
-int UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST = 5;
+int UFConnectionPool::MAX_SIMUL_CONNS_PER_HOST = 0;
 int UFConnectionPool::TIMEOUT_PER_REQUEST = 10;
 
 UFConnectionPool::UFConnectionPool() 

Copied: trafficserver/traffic/branches/UserFiber/core/src/UFIO.C (from r944754, trafficserver/traffic/branches/UserFiber/src/UFIO.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFIO.C?p2=trafficserver/traffic/branches/UserFiber/core/src/UFIO.C&p1=trafficserver/traffic/branches/UserFiber/src/UFIO.C&r1=944754&r2=949458&rev=949458&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFIO.C Sun May 30 02:07:05 2010
@@ -37,13 +37,15 @@ void UFIO::reset()
     _errno = 0; 
     _ufios = 0; 
     _lastEpollFlag = 0;
-    _amtReadLastTimeEqualToAskedAmt = false;
     _sleepInfo = 0;
+    _markedActive = false;
 }
 
 UFIO::~UFIO()
 {
     close();
+    if(_sleepInfo)
+        _sleepInfo->_ufio = 0;
 }
 
 UFIO::UFIO(UF* uf, int fd)
@@ -113,7 +115,7 @@ int UFIO::setupConnectionToAccept(const 
         return -1;
     }
 
-    char* interface_addr = ((i_a) && strlen(i_a)) ? const_cast<char*>(i_a) : const_cast<char*>(string("0.0.0.0").c_str());
+    const char* interface_addr = ((i_a) && strlen(i_a)) ? i_a : "0.0.0.0";
 
     int n = 1;
     if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n)) < 0) 
@@ -264,7 +266,7 @@ void UFIO::accept(UFIOAcceptThreadChoose
             connectedArgs->ufio->_remotePort = cli_addr.sin_port;
             uf->_startingArgs = connectedArgs;
 
-            listOfUFsToAdd.push_front(uf);
+            listOfUFsToAdd.push_back(uf);
             listOfUFsToAddSize++;
 
             if(listOfUFsToAddSize == 100)
@@ -302,10 +304,14 @@ void UFIO::accept(UFIOAcceptThreadChoose
     }
 }
 
-UFIOScheduler* UFIO::getUFIOS()
+UFIOScheduler* UFIOScheduler::getUFIOS(pthread_t tid)
 {
     UFIOScheduler* tmpUfios = 0;
-    //
+
+    if(!tid || tid == pthread_self())
+        return (UFIOScheduler*)pthread_getspecific(_keyToIdentifySchedulerOnThread);
+
+
     //find the ufios for this thread - this map operation should only be done once
     ThreadFiberIOSchedulerMap::iterator index = UFIOScheduler::_tfiosscheduler.find(pthread_self());
     if(index != UFIOScheduler::_tfiosscheduler.end())
@@ -321,41 +327,27 @@ UFIOScheduler* UFIO::getUFIOS()
 
 ssize_t UFIO::read(void *buf, size_t nbyte, TIME_IN_US timeout)
 {
-    UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+    UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
-    //we read everything there was to be read the last time, so this time wait to read
-    if(!_amtReadLastTimeEqualToAskedAmt) 
-    {
-        //wait for something to read first
-        if(!tmpUfios->setupForRead(this, timeout))
-        {
-            _errno = errno;
-            return -1;
-        }
-        if(_errno == ETIMEDOUT) //setupForRead will return w/ success however it will set the errno to ETIMEDOUT if a timeout occurred
-            return -1;
-    }
-
-    _amtReadLastTimeEqualToAskedAmt = false;
     ssize_t n = 0;;
     while(1)
     {
         n = ::read(_fd, buf, nbyte);
         if(n > 0)
-        {
-            _amtReadLastTimeEqualToAskedAmt = ((unsigned int) n != nbyte) ? false : true;
             return n;
-        }
         else if(n < 0)
         {
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
                 _errno = 0;
-                if(!tmpUfios->setupForRead(this, timeout))
+                _markedActive = false;
+                while(!_markedActive) 
                 {
-                    _errno = errno;
-                    return -1;
+                    //wait for something to read first
+                    if(!tmpUfios->setupForRead(this, timeout))
+                        return -1;
                 }
+                continue;
             }
             else if(errno == EINTR)
                 continue;
@@ -373,7 +365,7 @@ ssize_t UFIO::read(void *buf, size_t nby
 
 ssize_t UFIO::write(const void *buf, size_t nbyte, TIME_IN_US timeout)
 {
-    UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+    UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
     ssize_t n = 0;;
     unsigned int amtWritten = 0;
@@ -393,17 +385,21 @@ ssize_t UFIO::write(const void *buf, siz
             _errno = errno;
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
+                _markedActive = false;
                 _errno = 0;
-                if(!tmpUfios->setupForWrite(this, timeout))
+                while(!_markedActive)
                 {
-                    _errno = errno;
-                    return -1;
+                    if(!tmpUfios->setupForWrite(this, timeout))
+                        return -1;
                 }
             }
             else if(errno == EINTR)
                 continue;
             else
+            {
+                _errno = errno;
                 break;
+            }
         }
         else if(n == 0)
             break;
@@ -420,7 +416,7 @@ int UFIO::connect(const struct sockaddr 
 
 
     //find the scheduler for this request
-    UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+    UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
     while(::connect(_fd, addr, addrlen) < 0)
     {
@@ -444,7 +440,7 @@ int UFIO::connect(const struct sockaddr 
 
 int UFIO::sendto(const char *buf, int len, const struct sockaddr *to, int tolen, TIME_IN_US timeout)
 {
-    UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+    UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
     ssize_t n = 0;;
     unsigned int amtWritten = 0;
@@ -464,17 +460,21 @@ int UFIO::sendto(const char *buf, int le
             _errno = errno;
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
+                _markedActive = false;
                 _errno = 0;
-                if(!tmpUfios->setupForWrite(this, timeout))
+                while(!_markedActive)
                 {
-                    _errno = errno;
-                    return -1;
+                    if(!tmpUfios->setupForWrite(this, timeout))
+                        return -1;
                 }
             }
             else if(errno == EINTR)
                 continue;
             else
+            {
+                _errno = errno;
                 break;
+            }
         }
         else if(n == 0)
             break;
@@ -486,7 +486,7 @@ int UFIO::sendmsg(const struct msghdr *m
                   int flags,
 	              TIME_IN_US timeout)
 {
-    UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+    UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
     ssize_t n = 0;;
     while(1)
@@ -499,17 +499,21 @@ int UFIO::sendmsg(const struct msghdr *m
             _errno = errno;
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
+                _markedActive = false;
                 _errno = 0;
-                if(!tmpUfios->setupForWrite(this, timeout))
+                while(!_markedActive)
                 {
-                    _errno = errno;
-                    return -1;
+                    if(!tmpUfios->setupForWrite(this, timeout))
+                        return -1;
                 }
             }
             else if(errno == EINTR)
                 continue;
             else
+            {
+                _errno = errno;
                 break;
+            }
         }
         else if(n == 0)
             break;
@@ -523,41 +527,26 @@ int UFIO::recvfrom(char *buf, 
 		           int *fromlen, 
                    TIME_IN_US timeout)
 {
-    UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
-
-    //we read everything there was to be read the last time, so this time wait to read
-    if(!_amtReadLastTimeEqualToAskedAmt) 
-    {
-        //wait for something to read first
-        if(!tmpUfios->setupForRead(this, timeout))
-        {
-            _errno = errno;
-            return -1;
-        }
-        if(_errno == ETIMEDOUT) //setupForRead will return w/ success however it will set the errno to ETIMEDOUT if a timeout occurred
-            return -1;
-    }
+    UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
-    _amtReadLastTimeEqualToAskedAmt = false;
     ssize_t n = 0;;
     while(1)
     {
         n = ::recvfrom(_fd, buf, len, 0, from, (socklen_t *)fromlen);
         if(n > 0)
-        {
-            _amtReadLastTimeEqualToAskedAmt = (n != len) ? false : true;
             return n;
-        }
         else if(n < 0)
         {
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
                 _errno = 0;
-                if(!tmpUfios->setupForRead(this, timeout))
+                _markedActive = false;
+                while(!_markedActive)
                 {
-                    _errno = errno;
-                    return -1;
+                    if(!tmpUfios->setupForRead(this, timeout))
+                        return -1;
                 }
+                continue;
             }
             else if(errno == EINTR)
                 continue;
@@ -577,7 +566,7 @@ int UFIO::recvmsg(struct msghdr *msg, 
                   int flags,
 	              TIME_IN_US timeout)
 {
-    UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
+    UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
     ssize_t n = 0;
     while(1)
@@ -590,11 +579,13 @@ int UFIO::recvmsg(struct msghdr *msg, 
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
                 _errno = 0;
-                if(!tmpUfios->setupForRead(this, timeout))
+                _markedActive = false;
+                while(!_markedActive)
                 {
-                    _errno = errno;
-                    return -1;
+                    if(!tmpUfios->setupForRead(this, timeout))
+                        return -1;
                 }
+                continue;
             }
             else if(errno == EINTR)
                 continue;
@@ -611,6 +602,16 @@ int UFIO::recvmsg(struct msghdr *msg, 
 }
 
 
+static pthread_key_t getThreadKey()
+{
+    if(pthread_key_create(&UFIOScheduler::_keyToIdentifySchedulerOnThread, 0) != 0)
+    {
+        cerr<<"couldnt create ufios thread specific key "<<strerror(errno)<<endl;
+        exit(1);
+    }
+    return UFIOScheduler::_keyToIdentifySchedulerOnThread;
+}
+pthread_key_t UFIOScheduler::_keyToIdentifySchedulerOnThread = getThreadKey();
 
 ThreadFiberIOSchedulerMap UFIOScheduler::_tfiosscheduler;
 EpollUFIOScheduler::EpollUFIOScheduler(UF* uf, unsigned int maxFds)
@@ -623,6 +624,15 @@ EpollUFIOScheduler::EpollUFIOScheduler(U
     _earliestWakeUpFromSleep = 0;
 }
 
+UFIOScheduler::UFIOScheduler()
+{ 
+    _connPool = new UFConnectionPool;
+}
+UFIOScheduler::~UFIOScheduler()
+{ 
+    delete _connPool; 
+}
+
 EpollUFIOScheduler::~EpollUFIOScheduler()
 {
     if(_epollFd != -1)
@@ -662,15 +672,19 @@ bool EpollUFIOScheduler::isSetup()
         _epollFd = -1;
     }
 
+    pthread_setspecific(_keyToIdentifySchedulerOnThread, this);
     return (_alreadySetup = true);
 }
 
-bool EpollUFIOScheduler::addToScheduler(UFIO* ufio, void* inputInfo, TIME_IN_US to)
+bool EpollUFIOScheduler::addToScheduler(UFIO* ufio, void* inputInfo, TIME_IN_US to, bool wait, bool runEpollCtl)
 {
     if(!ufio || !inputInfo || !isSetup())
+    {
+        ufio->_errno = EINVAL;
         return false;
+    }
 
-    if(ufio->_lastEpollFlag != *((int*)inputInfo)) //dont do anything if the flags are same as last time
+    if(runEpollCtl || ufio->_lastEpollFlag != *((int*)inputInfo)) //dont do anything if the flags are same as last time
     {
         struct epoll_event ev;
         ev.data.fd = ufio->getFd();
@@ -689,6 +703,7 @@ bool EpollUFIOScheduler::addToScheduler(
         {
             cerr<<"couldnt add/modify fd to epoll queue "<<strerror(errno)<<" trying to add "<<ufio->getFd()<<" to "<<_epollFd<<endl;
             exit(1);
+            ufio->_errno = EINVAL;
             return false;
         }
         ufio->_lastEpollFlag = ev.events;
@@ -704,6 +719,8 @@ bool EpollUFIOScheduler::addToScheduler(
         {
             cerr<<"couldnt create sleep info"<<endl;
             exit(1);
+            ufio->_errno = EINVAL;
+            return false;
         }
         ufsi->_ufio = ufio;
         ufio->_sleepInfo = ufsi;
@@ -714,14 +731,24 @@ bool EpollUFIOScheduler::addToScheduler(
         _sleepList.insert(std::make_pair(timeToWakeUp, ufsi));
     }
 
+    ufio->_errno = 0;
+    ufio->setUF(_ufs->getRunningFiberOnThisThread());
+    ufio->_markedActive = false;
+    if(!wait)
+        return true;
     ufio->getUF()->block(); //switch context till someone wakes me up
+
+    if(!to) //nothing to do w/ no timeout
+        return true;
+
     if(ufio->_sleepInfo)
     {
-        ufio->_sleepInfo->_ufio = 0; //set the sleep indicator to not have a dependency w/ this ufio
-        ufio->_sleepInfo = 0; //remove the sleep indicator
+        ufio->_sleepInfo->_ufio = 0;
+        ufio->_sleepInfo = 0;
+        return true;
     }
-
-    return true;
+    //ufio->_errno = ETIMEDOUT
+    return false;
 }
 
 bool EpollUFIOScheduler::setupForConnect(UFIO* ufio, TIME_IN_US to)
@@ -759,7 +786,6 @@ bool EpollUFIOScheduler::closeConnection
         _intUFIOMap.erase(index);
 
     return true;
-
     /*
     struct epoll_event ev;
     ev.data.fd = ufio->getFd();
@@ -768,6 +794,25 @@ bool EpollUFIOScheduler::closeConnection
     */
 }
 
+bool EpollUFIOScheduler::rpoll(list<UFIO*>& ufioList, TIME_IN_US to)
+{
+    int flags = EPOLLIN|EPOLLET|EPOLLPRI|EPOLLERR|EPOLLHUP;
+    for(list<UFIO*>::iterator beg = ufioList.begin();
+        beg != ufioList.end();
+        ++beg) 
+        addToScheduler(*beg, (void*)&flags, 0, false, true);
+
+    _ufs->getRunningFiberOnThisThread()->block();
+    /*
+    if(!to)
+        _ufs->getRunningFiberOnThisThread()->block();
+    else
+        _ufs->getRunningFiberOnThisThread()->usleep(to);
+        */
+
+    return true;
+}
+
 
 #ifndef PIPE_NOT_EFD
 #include <sys/eventfd.h>
@@ -853,8 +898,8 @@ static void* notifyEpollFunc(void* args)
 
 void EpollUFIOScheduler::waitForEvents(TIME_IN_US timeToWait)
 {
-    UFScheduler* myScheduler = UFScheduler::getUFScheduler();
-    if(!myScheduler)
+    _ufs = UFScheduler::getUFScheduler();
+    if(!_ufs)
     {
         cerr<<"have to be able to find my scheduler"<<endl;
         return;
@@ -872,18 +917,18 @@ void EpollUFIOScheduler::waitForEvents(T
     }
     makeSocketNonBlocking(pfd[0]);
     //makeSocketNonBlocking(pfd[1]); - dont make the write socket non-blocking
-    myScheduler->_notifyArgs = (void*)(&pfd[1]);
+    _ufs->_notifyArgs = (void*)(&pfd[1]);
     ens->_efd = pfd[0];
 #else
     int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC); //TODO: check the error code of the eventfd creation
-    myScheduler->_notifyArgs = (void*)&efd;
+    _ufs->_notifyArgs = (void*)&efd;
     ens->_efd = efd;
 #endif
-    myScheduler->_notifyFunc = notifyEpollFunc;
+    _ufs->_notifyFunc = notifyEpollFunc;
     //add the UF to handle the efds calls
     UF* eventFdFiber = new ReadNotificationUF();
     eventFdFiber->_startingArgs = ens;
-    myScheduler->addFiberToScheduler(eventFdFiber, 0);
+    _ufs->addFiberToScheduler(eventFdFiber, 0);
 
 
 
@@ -906,7 +951,6 @@ void EpollUFIOScheduler::waitForEvents(T
     UFIO* ufio = 0;
     UF* uf = 0;
     unsigned long long int amtToSleep = timeToWait;
-    unsigned long long int ufsAmtToSleep = 0;
     int i = 0;
     _interruptedByEventFd = false;
     UFScheduler* ufs = _uf->getParentScheduler();
@@ -916,6 +960,7 @@ void EpollUFIOScheduler::waitForEvents(T
         cerr<<"epoll scheduler has to be connected to some scheduler"<<endl;
         return;
     }
+    unsigned long long int amtToSleepFromUFS = 0;
     while(1)
     {
         if(_interruptedByEventFd) //this is so that the last interruption gets handled right away
@@ -924,10 +969,15 @@ void EpollUFIOScheduler::waitForEvents(T
             _uf->yield();
         }
 
-        if(amtToSleep > (ufsAmtToSleep = ufs->getAmtToSleep()))
-            amtToSleep = (int)(ufsAmtToSleep/1000);
+        if(ufs->getActiveRunningListSize() < 2) //means that epoll is the only fiber thats currently active
+        {
+            if(amtToSleep > (amtToSleepFromUFS = ufs->getAmtToSleep()))
+                amtToSleep = (int)(amtToSleepFromUFS/1000);
+            else
+                amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
+        }
         else
-            amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
+            amtToSleep = 0; //dont wait on epoll - since there are other ufs waiting to run
         nfds = ::epoll_wait(_epollFd, _epollEventStruct, _maxFds, amtToSleep);
         if(nfds > 0)
         {
@@ -943,6 +993,7 @@ void EpollUFIOScheduler::waitForEvents(T
                         cerr<<"invalid user fiber io found for fd, "<<_epollEventStruct[i].data.fd<<endl;
                         exit(1);
                     }
+                    ufio->_markedActive = true;
                     //activate the fiber
                     ufs->addFiberToScheduler(uf, 0);
                 }
@@ -1014,8 +1065,8 @@ void EpollUFIOScheduler::waitForEvents(T
         _uf->yield();
     }
 
-    myScheduler->_notifyArgs = 0;
-    myScheduler->_notifyFunc = 0;
+    _ufs->_notifyArgs = 0;
+    _ufs->_notifyFunc = 0;
 }
 
 int IORunner::_myLoc = -1;