You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/08/13 21:43:10 UTC

svn commit: r985334 [1/3] - in /trafficserver/traffic/branches/UserFiber: core/include/ core/src/ samples/

Author: akundu
Date: Fri Aug 13 19:43:09 2010
New Revision: 985334

URL: http://svn.apache.org/viewvc?rev=985334&view=rev
Log:
Steve Jiang
    - UFIO
        - support for readline
        - change TIME_IN_US to default to -1 and be of type ssize_t
        - update IO fxns to use TIME_IN_US

Bryan Call    
    - add UFSwapContext.H - expose the internal swap context fxn
    - add setter+getter fxns in UFIO + UF

Raghav Jeyaraman
    - UFStats
        - support for current connections count
    - UFConf
        - take the value as an input type into templated getValue fxns
    - UFServer
        - per thread initializer support

Anirban Kundu
    - UFHTTPLoader
        - increased default options to support exiting early
        - put an interval between the threads starting to do work
        - use new timeout mode of -1 meaning no timeout
    - UF.H
        - use a deque to track the lock instead of a list
        - support UF objects being created and destroyed using a factory
        - ability to set the stack size for future created UFs
        - run all active (and newly created) UFs first before sleeping
    - UFPC.H
        - support for joinable and non-joinable producers/consumers
        - support to not use a lock in managing the producer and consumer
    - UFIO.H
        - support for writev (similar to state threads writev implementation)
    - UFConnectionPool
        - support to remove unused connections after a little while

Manjesh Nilange
        - support for listening to multiple ports from the same server


Added:
    trafficserver/traffic/branches/UserFiber/core/include/UFSwapContext.H   (props changed)
      - copied unchanged from r985331, trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
Removed:
    trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
Modified:
    trafficserver/traffic/branches/UserFiber/core/include/UF.H
    trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
    trafficserver/traffic/branches/UserFiber/core/include/UFConf.H
    trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H
    trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
    trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
    trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
    trafficserver/traffic/branches/UserFiber/core/include/UFServer.H
    trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H
    trafficserver/traffic/branches/UserFiber/core/include/UFStats.H
    trafficserver/traffic/branches/UserFiber/core/src/Makefile
    trafficserver/traffic/branches/UserFiber/core/src/UF.C
    trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConf.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
    trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
    trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
    trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
    trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
    trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
    trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C

Modified: trafficserver/traffic/branches/UserFiber/core/include/UF.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UF.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UF.H Fri Aug 13 19:43:09 2010
@@ -2,7 +2,6 @@
 #define USERTHREADS_H
 
 #include <sys/time.h>
-
 #include <iostream>
 #include <map>
 #include <stdint.h>
@@ -10,12 +9,16 @@
 #include <set>
 #include <vector>
 #include <list>
+#include <deque>
 #include <ucontext.h>
 #include <pthread.h>
 #include <errno.h>
 
-using namespace std;
+#include <UFSwapContext.H>
+//#include <ufutil/Factory.H>
+
 namespace std { using namespace __gnu_cxx; }
+std::string getPrintableTime();
 
 enum UFStatus
 {
@@ -23,17 +26,20 @@ enum UFStatus
     WAITING_TO_RUN      = 1,
     BLOCKED             = 2,
     RUNNING             = 3,
-    COMPLETED           = 4
+    COMPLETED           = 4,
+    YIELDED             = 5
 };
 
-typedef unsigned long long int TIME_IN_US;
+typedef long long int TIME_IN_US;
 
 //create the type of UF you'd like to pass into the accept handler
 typedef unsigned long long int UFId;
 struct UFScheduler;
-struct UFMutex;
-struct UF
+
+struct UFFact;
+class UF
 {
+public:
     friend class UFScheduler;
     friend class UFMutex;
 
@@ -52,34 +58,72 @@ struct UF
     //otherwise behavior is unexpected
     void                 yield();
     ///must be called after the fiber is added to a scheduler
-    void                 usleep(unsigned long long int sleepAmtInUs);
-    static void          gusleep(unsigned long long int sleepAmtInUs);
+    void                 usleep(TIME_IN_US sleepAmtInUs);
+    static void          gusleep(TIME_IN_US sleepAmtInUs);
     ///simply block the fiber
     void                 block();
     UFStatus             getStatus() const;
+    unsigned long long int getLastRun() const;
 
 
     UFStatus             _status;
     void*                _startingArgs;
+    static unsigned int  DEFAULT_STACK_SIZE;
+    UFFact*              _myFactory;
+    void reset();
 
-protected:
+private:
     static UFId          _globalId;
-    UFId                 _myId;
     UFScheduler*         _parentScheduler;
+    UFId                 _myId;
     ucontext_t           _UFContext;
     bool                 _UFObjectCreatedStack;
+    unsigned long long int _lastRun;
 
-private:
     void waitOnLock();
 };
+inline void UF::reset() 
+{
+    _startingArgs = 0;
+    _myFactory = 0; 
+    _parentScheduler = 0; 
+    _lastRun = 0; 
+    _status = NOT_STARTED; 
+}
 inline UFStatus UF::getStatus() const { return _status; }
+inline unsigned long long int UF::getLastRun() const { return _lastRun; }
+
+class UFFact
+{
+public:
+    virtual ~UFFact() {}
+    virtual UF* getUF();
+    virtual void releaseUF(UF* uf);
+
+protected:
+    virtual UF* createUF() = 0;
+    virtual void destroyUF(UF* uf) = 0;
+};
+inline UF* UFFact::getUF()
+{
+    UF* uf = createUF();
+    if(!uf)
+        return 0;
+    uf->reset();
+    uf->_myFactory = this;
+    return uf;
+}
+inline void UFFact::releaseUF(UF* uf)
+{
+    destroyUF(uf);
+}
 
 struct UFFactory
 {
     static UFFactory* getInstance();
     UFFactory();
 
-    UF* selectUF(unsigned int location);;
+    UF* selectUF(unsigned int location);
     int registerFunc(UF* uf);
 
 protected:
@@ -92,9 +136,9 @@ inline UFFactory* UFFactory::getInstance
 inline UF* UFFactory::selectUF(unsigned int location) { return _objMapping[location]; }
 
 struct UFWaitInfo;
-typedef std::list<UFWaitInfo*> UFWaitList;
-typedef map<UF*, UFWaitInfo*> UFWLHash;
-typedef std::list<UF*>          UFList;
+typedef std::map<UF*, UFWaitInfo*>  UFWLHash;
+typedef std::list<UF*>              UFList;
+typedef std::deque<UF*>             UFDeque;
 struct UFMutex
 {
     UFMutex() 
@@ -106,7 +150,7 @@ struct UFMutex
 
     bool lock(UF* uf);
     bool unlock(UF* uf);
-    bool tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS = 0);
+    bool tryLock(UF* uf, TIME_IN_US autoRetryIntervalInUS = 0);
 
     //THE CALLER MUST get the lock before calling this fxn
     //THE CALLER MUST release the lock after this fxn is called
@@ -119,7 +163,7 @@ struct UFMutex
     void signal();
     //THE CALLER MUST get the lock before calling this fxn
     //THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
-    int condTimedWait(UF *uf, unsigned long long int sleepAmtInUs);
+    bool condTimedWait(UF *uf, TIME_IN_US sleepAmtInUs);
 
     void releaseSpinLock(bool spinCPU = false);
     void getSpinLock(bool spinCPU = false);
@@ -127,7 +171,6 @@ struct UFMutex
 protected:
     int                 _lockActive;
     UFList              _listOfClientsWaitingOnLock;
-    //UFWaitList          _listOfClientsWaitingOnCond;
     UFWLHash            _listOfClientsWaitingOnCond;
     bool                _lockCurrentlyOwned;
     UF*                 _mustRunUF;
@@ -144,10 +187,10 @@ struct UFWaitInfo
 inline void UFWaitInfo::reset() { _uf = 0; _sleeping = false; _waiting = false; }
 
 
-typedef std::multimap<unsigned long long int, UFWaitInfo*> MapTimeUF;
+typedef std::multimap<TIME_IN_US, UFWaitInfo*> MapTimeUF;
 //typedef std::map<pthread_t,UFScheduler*> ThreadUFSchedulerMap;
 //per thread scheduler
-typedef hash_map<pthread_t, UFScheduler*, hash<uintptr_t> > ThreadUFSchedulerMap;
+typedef std::hash_map<pthread_t, UFScheduler*, std::hash<uintptr_t> > ThreadUFSchedulerMap;
 
 struct UFScheduler
 {
@@ -164,7 +207,7 @@ struct UFScheduler
     bool addFiberToScheduler(UF* uf,      /* the UF to add */
                              pthread_t tid = 0); /* the thread to add the UF to */
     //add the fxn to add multiple ufs in one shot (if they're on one tid)
-    bool addFiberToScheduler(const std::list<UF*>& ufList, 
+    bool addFiberToScheduler(const UFList& ufList, 
                               pthread_t tid = 0);
 
 
@@ -181,10 +224,10 @@ struct UFScheduler
     static bool                  _inThreadedMode;
 
     UF* getRunningFiberOnThisThread();
-    const ucontext_t& getMainContext() const;
+    ucontext_t* getMainContext();
     void setSpecific(void* args);
     void* getSpecific() const;
-    unsigned long long int getAmtToSleep() const;
+    TIME_IN_US getAmtToSleep() const;
     static void setExit(bool exit = true);
     bool shouldExit() const;
     void setExitJustMe(bool exit = true);
@@ -203,29 +246,31 @@ struct UFScheduler
     //to allow to identify the thread running now
     static pthread_key_t        _specific_key;
 
-    static void ufCreateThread(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+    static void ufCreateThread(pthread_t* tid, UFList* ufsToStartWith);
 
     static bool                 _exit;
     bool                        _exitJustMe;
+    unsigned long long int      getRunCounter() const;
+
 protected:
+    unsigned long long int      _runCounter;
     UF*                         _currentFiber;
     ucontext_t                  _mainContext;
 
     //no lock for active running list - cause only the running
     //thread can add to it
-    UFList                      _activeRunningList;
-    size_t                      _activeRunningListSize;
+    UFDeque                     _activeRunningList;
 
     //nominate to add to a thread's running list
-    UFList                      _nominateToAddToActiveRunningList;
+    UFDeque                     _nominateToAddToActiveRunningList;
     pthread_mutex_t             _mutexToNominateToActiveList;
     pthread_cond_t              _condToNominateToActiveList;
     
     //the sleep tree
     MapTimeUF                   _sleepList;
-    unsigned long long int      _earliestWakeUpFromSleep;
+    TIME_IN_US                  _earliestWakeUpFromSleep;
     //store the shortest sleep interval
-    unsigned long long int      _amtToSleep;
+    TIME_IN_US                  _amtToSleep;
 
 
     //store thread specific content
@@ -234,20 +279,18 @@ protected:
 
     void notifyUF();
     
-    list<UFWaitInfo*>  _availableWaitInfo;
+    std::deque<UFWaitInfo*>     _availableWaitInfo;
     UFWaitInfo* getWaitInfo();
     void releaseWaitInfo(UFWaitInfo& ufsi);
     bool addFiberToSelf(UF* uf);
-    bool addFiberToAnotherThread(const std::list<UF*>& ufList, pthread_t tid);
-
-public:
-    UFMutex testingCondTimedWait;
+    bool addFiberToAnotherThread(const UFList& ufList, pthread_t tid);
 };
-inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningListSize; }
+inline unsigned long long int UFScheduler::getRunCounter() const { return _runCounter; }
+inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningList.size(); }
 inline bool UFScheduler::shouldExit() const { return (_exitJustMe || _exit) ? true : false; }
-inline unsigned long long int UFScheduler::getAmtToSleep() const { return _amtToSleep; }
+inline TIME_IN_US UFScheduler::getAmtToSleep() const { return _amtToSleep; }
 inline UF* UFScheduler::getRunningFiberOnThisThread(){ return _currentFiber; }
-inline const ucontext_t& UFScheduler::getMainContext() const { return _mainContext; };
+inline ucontext_t* UFScheduler::getMainContext() { return &_mainContext; }
 inline void UFScheduler::setSpecific(void* args) { _specific = args; }
 inline void* UFScheduler::getSpecific() const { return _specific; }
 inline void UFScheduler::setExit(bool exit) { _exit = exit; }
@@ -275,17 +318,17 @@ inline UFScheduler* UF::getParentSchedul
 
 inline void UF::waitOnLock() { block(); }
 
-inline void UF::gusleep(unsigned long long int sleepAmtInUs)
+inline void UF::gusleep(TIME_IN_US sleepAmtInUs)
 {
     UFScheduler::getUFScheduler()->getRunningFiberOnThisThread()->usleep(sleepAmtInUs);
 }
 
-inline unsigned long long int timeInUS(timeval& t)
+inline TIME_IN_US timeInUS(timeval& t)
 {
-    return ((unsigned long long int)(((unsigned long long int) t.tv_sec)*1000000)+(unsigned long long int) t.tv_usec);
+    return ((TIME_IN_US)(((TIME_IN_US) t.tv_sec)*1000000)+(TIME_IN_US) t.tv_usec);
 }
 
-inline void UF::usleep(unsigned long long int sleepAmtInUs)
+inline void UF::usleep(TIME_IN_US sleepAmtInUs)
 {
     if(!sleepAmtInUs)
     {
@@ -296,7 +339,7 @@ inline void UF::usleep(unsigned long lon
     struct timeval now;
     gettimeofday(&now, 0);
     
-    unsigned long long int timeToWakeUp = timeInUS(now) + sleepAmtInUs;
+    TIME_IN_US timeToWakeUp = timeInUS(now) + sleepAmtInUs;
     if( _parentScheduler->_earliestWakeUpFromSleep > timeToWakeUp ||
        !_parentScheduler->_earliestWakeUpFromSleep)
         _parentScheduler->_earliestWakeUpFromSleep = timeToWakeUp;
@@ -318,7 +361,11 @@ inline void UF::block()
 inline void UF::yield()
 {
     //switch context back to the main scheduler
-    swapcontext(&_UFContext, &(_parentScheduler->getMainContext()));
+#if __WORDSIZE == 64
+    uf_swapcontext(&_UFContext, _parentScheduler->getMainContext());
+#else
+    swapcontext(&_UFContext, _parentScheduler->getMainContext());
+#endif
 }
 
 inline void UFMutex::releaseSpinLock(bool spinCPU)

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFAres.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFAres.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFAres.H Fri Aug 13 19:43:09 2010
@@ -9,6 +9,9 @@
 #include <ares.h>
 #include "UFDNS.H"
 #include "UFHostEnt.H"
+#include <iostream>
+
+using namespace std;
 
 // for ares less then version 1.7.0
 //#ifndef ares_addrttl

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFConf.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFConf.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFConf.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFConf.H Fri Aug 13 19:43:09 2010
@@ -3,6 +3,10 @@
 
 #include <string>
 #include <vector>
+#include <list>
+#include <ostream>
+
+#include <iostream>
 #include <ext/hash_map>
 
 using namespace std;
@@ -13,9 +17,9 @@ namespace __gnu_cxx
     template<> struct hash< std::string >
     {
         size_t operator()( const std::string& x ) const
-            {
-                return hash< const char* >()( x.c_str() );
-            }
+        {
+            return hash< const char* >()( x.c_str() );
+        }
     };
 }
 
@@ -25,10 +29,37 @@ namespace __gnu_cxx
 class ConfValueBase
 {
 public:
-    virtual void dump(ostream &output)=0;
+    virtual void dump(std::ostream &output)=0;
     virtual ~ConfValueBase() { }
+    std::string type;
 };
 
+/** Helper for printing out vector
+ */   
+template <typename T>
+std::ostream& operator <<  (std::ostream& output, const std::vector<T> &value)
+{
+    if(!value.size())
+        return output;
+    
+    for(typename std::vector<T>::const_iterator it = value.begin(); it != value.end(); it++)
+        output << *it << " ";
+    return output;
+}
+
+/** Helper for printing out vector
+ */   
+template <typename T>
+std::ostream& operator <<  (std::ostream& output, const std::list<T> &value)
+{
+    if(!value.size())
+        return output;
+    
+    for(typename std::list<T>::const_iterator it = value.begin(); it != value.end(); it++)
+        output << *it << " ";
+    return output;
+}
+
 /** Template conf value 
  *  This covers all built in types.
  *  This is used by UFConf for string, int, bool and double
@@ -38,56 +69,154 @@ template <class T>
 class ConfValue : public ConfValueBase {
 public:
     T mElement;
-    void dump(ostream& output) { output << mElement; }
-    friend ostream& operator <<  (ostream& output, const ConfValue<T>& value)
+
+    void dump(std::ostream& output) { output << mElement; }
+    friend std::ostream& operator <<  (std::ostream& output, const ConfValue<T>& value)
     {
         output << value.mElement;
         return output;
     }
 };
 
+/** Get conf value from ConfValueBase
+ *  This is a helper function which makes it easier to get conf values
+ *  _testBool = &(((ConfValue<bool> *)conf_val)->mElement) will now become _testBool = confValueGet<bool>(conf_val)
+ */
+template <class T>
+T* confValueGet(ConfValueBase *conf_val)
+{
+    return &(((ConfValue<T> *)conf_val)->mElement);
+}
+
+/// Forward declaration
+class UFConfManager;
+
 /** Holds config data for a given file
  *  The class has a parent conf variable.
  *  the get* functions look at _data. If the requested key is not found, they lookup the parent
  */
 class UFConf
 {
+    friend class UFConfManager;
 public:
-    UFConf() : _parent(NULL) { }
+    UFConf(const std::string &conf_file);
+    void init();
     ConfValueBase *get(const std::string &key);
     int *getInt(const std::string &key);
     double *getDouble(const std::string &key);
-    string *getString(const std::string &key);
+    std::string *getString(const std::string &key);
     bool *getBool(const std::string &key);
+    std::vector<std::string> *getStringVector(const std::string &key);
 
-    void setInt(const std::string &key, int value);
-    void setDouble(const std::string &key, double value);
-    void setString(const std::string &key, const std::string &value);
-    void setBool(const std::string &key, bool value);
+    void setInt(const std::string &key, const std::string &key_type, int value);
+    void setDouble(const std::string &key, const std::string &key_type, double value);
+    void setString(const std::string &key, const std::string &key_type, const std::string &value);
+    void setBool(const std::string &key, const std::string &key_type, bool value);
+    void setStringVector(const std::string &key, const std::string &key_type, const std::vector<std::string> &value);
     
-    void setParent(UFConf *parent) { _parent = parent; }
     bool parse(const std::string &conf_file);
-    ~UFConf();
+    virtual bool parseLine(const std::string &line);
+    virtual void clear();
+    virtual void cache(const std::string &conf_key, ConfValueBase *conf_val) { }
+    virtual void afterParse() { }
+    virtual ~UFConf() { }
+
+    std::string conf_file;
 
-    friend ostream& operator<<(ostream& output, const UFConf &conf);
+    friend std::ostream& operator<<(std::ostream& output, const UFConf &conf);
+    
+protected:
+    template<typename T>
+    void _set(const std::string &conf_key, const std::string &conf_key_type, const T &value)
+        {
+            // Check if a value already exists for the key
+            ConfValueBase *existingValue = get(conf_key);
+            if(existingValue != NULL) {
+                delete existingValue;
+            }
+            
+            // Create new value. Set type. Copy value.
+            ConfValue<T> *confValue = new ConfValue<T>;
+            confValue->type = conf_key_type;
+            confValue->mElement = value;
+            _data[conf_key] = confValue;
+
+            // call cache so that derived classes can cache whatever value they need to
+            cache(conf_key, confValue);
+        }
+
+    template<typename T>
+    T* _get(const std::string &key)
+        {
+            ConfValue<T> *confValue = (ConfValue<T> *)get(key);
+            if(confValue != NULL) {
+                return &confValue->mElement;
+            }
+            if(_parent == NULL)
+                return NULL;
+            return _parent->_get<T>(key);
+        }
+        
+    void _setParent(UFConf *parent);
     
-private:
     UFConf *_parent;
     std::hash_map<std::string, ConfValueBase *> _data;
 };
 
+/** Factory to create conf objects
+ *  This is needed because the cache method is virtual and needs to be called from the constructor
+ */   
+template< class T >
+T* UFConfFactory(const std::string &conf_file)
+{
+    T* confObject = new T(conf_file);
+    confObject->init();
+
+    return confObject;
+}
+
 /** Manages config objects in the system 
  * 
  */
 class UFConfManager
 {
 public:
-    static UFConf* addChildConf(const std::string &conf_file, const std::string &parent_conf_file="/home/y/conf/UF/uf.conf");
-    static UFConf* addConf(const std::string &conf_file);
-    static UFConf* getConf(const std::string &conf_file);
-    static void dump();
+    UFConfManager();
+    void reload();
+    UFConf* addChildConf(const std::string &conf_file, const std::string &parent_conf_file="/home/y/conf/UF/uf.conf");
+    UFConf* addConf(const std::string &conf_file);
+    bool addConf(UFConf *conf, const string &conf_file);
+    bool addChildConf(UFConf *child_conf, const string &conf_file, const string &parent_conf_file="/home/y/conf/UF/uf.conf");
+    UFConf* getConf(const std::string &conf_file);
+    void dump();
+
+    static long getRefreshTime();
+    static void setRefreshTime(long);
+    static UFConfManager* getConfManager();
+    
+    static pthread_key_t threadSpecificKey;
+    
 private:
-    static std::hash_map<std::string, UFConf *> _configs;
+    /// force children to pickup changes from a parent
+    void _reloadChildren(UFConf *conf);
+
+    /// add conf file to list of files being monitored for changes
+    void _addWatch(const std::string &conf);
+
+    /// conf object associated with a conf file
+    std::hash_map<std::string, UFConf *> _configs;
+
+    /// list of children for a given conf
+    std::hash_map<std::string, vector<std::string> > _child_map;
+
+    /// map of fds being monitored for changes to the filename
+    std::hash_map<int, std::string> _watch_fd_map;
+    int _notify_fd;
+
+    /// Time between checking for conf file changes
+    static long _refreshTime;
+
+    static pthread_key_t _createThreadKey();
 };
 
 #endif

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H Fri Aug 13 19:43:09 2010
@@ -11,7 +11,7 @@
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <netdb.h>
-#include "UF.H"
+#include <UF.H>
 
 class UFConnGroupInfo;
 class UFConnIPInfo;
@@ -23,7 +23,7 @@ struct UFConnectionPool
 
     bool addGroup(UFConnGroupInfo* groupInfo);
     UFConnGroupInfo* removeGroup(const std::string& groupName);
-    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true);
+    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true, TIME_IN_US connectTimeout = -1);
     void releaseConnection(UFIO* ufIO, bool connOk = true);
     void clearUnusedConnections(TIME_IN_US lastUsedTimeDiff = 300000000 /*300 secs*/, unsigned long long int coverListTime = 60*1000*1000);
 
@@ -40,5 +40,13 @@ protected:
     UFConnectionPoolImpl*           _impl;
 };
 
+struct UFConnectionPoolCleaner : public UF
+{
+    void run();
+    UFConnectionPoolCleaner (bool registerMe = false);
+    UF* createUF() { return new UFConnectionPoolCleaner(); }
+};
+inline UFConnectionPoolCleaner::UFConnectionPoolCleaner (bool registerMe) { /*if(registerMe) _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);*/ }
+
 
 #endif

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H Fri Aug 13 19:43:09 2010
@@ -1,8 +1,8 @@
 #ifndef UF_DNS_H
 #define UF_DNS_H
 
-#include "UF.H"
-#include "UFIO.H"
+#include <UF.H>
+#include <UFIO.H>
 class UFHostEnt;
 class UFDNS : public UFIO
 {

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFIO.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFIO.H Fri Aug 13 19:43:09 2010
@@ -4,15 +4,13 @@
 #include <string>
 #include <ext/hash_map>
 #include <stdint.h>
-#include "UF.H"
+#include <UF.H>
 
-using namespace std;
 namespace std { using namespace __gnu_cxx; }
 
 
 
 
-
 struct UFIOAcceptThreadChooser
 {
     virtual std::pair<UFScheduler*,pthread_t> pickThread(int listeningFd) = 0;
@@ -60,40 +58,66 @@ struct UFIO
     //TODO: create ipv6 accept model
     void accept(UFIOAcceptThreadChooser* ufiotChooser,
                 unsigned short int ufLocation,
+                unsigned short int port,
                 void* startingArgs,
                 void* stackPtr = 0,
                 unsigned int stackSize = 0);
 
     //The fxn will call isSetup which will make the connection request non-blocking and setup the socket
     //TODO: support ipv6 connect
-    bool connect(const struct sockaddr *addr, int addrlen, TIME_IN_US timeout);
+    bool connect(const struct sockaddr *addr, socklen_t addrlen, TIME_IN_US timeout);
 
-    //since this is most likely going to be run on an edge trigger system
-    //reads should be done in such a manner so that if the read call
-    //returns back w/ the same number specified, then a following read call
-    //should be made to ensure that nothing is left in the network buffer
-    //since in edge trigger no more events will be generated for data that was
-    //already seen earlier as being in the network. Therefore always make 
-    //another call to this fxn if the return was == to nbyte
-    ssize_t read(void *buf, size_t nbyte, TIME_IN_US timeout = 0);
-    ssize_t write(const void *buf, size_t nbyte, TIME_IN_US timeout = 0);
+    /**
+     * @brief read until delimiter is encountered or n-1 bytes has been read
+     * @param buf the output buffer. The buffer is null terminated if readLine was successful.
+     * @param n the size of the buffer pointed to by buf in bytes
+     * @return number of bytes read, not including the delimiter on success. -1 on error, 
+     *         getErrno() will return the underlying error.
+     */
+    ssize_t readLine(char* buf, size_t n, char delim='\n');
+    /**
+     * @brief read until delimiter is encountered or n-1 bytes has been read
+     * @param out the output string
+     * @param n readLine will attempt to read at most n-1 bytes
+     * @return number of bytes read, not including the delimiter on success or -1 on error. 
+     *         getErrno() will return the underlying error.
+     */
+    ssize_t readLine(std::string &out, size_t n, char delim='\n');
+
+    /**
+     * @brief attempt to read nbytes from the underlying fd
+     * @param buf the output buffer
+     * @param timeout the timeout in microseconds. 0 indicates a non-blocking call, -1 is no timeout
+     * @returns number of bytes read, or -1 on error. getErrno() will return the underlying error
+     * 
+     * since this is most likely going to be run on an edge trigger system
+     * reads should be done in such a manner so that if the read call
+     * returns back nbyte, then a following read call
+     * should be made to ensure that nothing is left in the network buffer
+     * since in edge trigger no more events will be generated for data that was
+     * already seen earlier as being in the network. Therefore always make 
+     * another call to this fxn if the return was == to nbyte
+     */
+    ssize_t read(void *buf, size_t nbyte, TIME_IN_US timeout = -1);
+    ssize_t write(const void *buf, size_t nbyte, TIME_IN_US timeout = -1);
+    ssize_t writev(const struct iovec *iov, int iov_size, TIME_IN_US timeout = -1);
     int sendto(const char *msg, 
-                  int len,
-	              const struct sockaddr *to, 
-                  int tolen, 
-                  TIME_IN_US timeout);
+               size_t len,
+               const struct sockaddr *to, 
+               socklen_t tolen, 
+               TIME_IN_US timeout);
     int sendmsg(const struct msghdr *msg, 
-                   int flags,
-	               TIME_IN_US timeout);
+                int flags,
+                TIME_IN_US timeout);
     int recvfrom(char *buf, 
-                    int len, 
-                    struct sockaddr *from,
-		            int *fromlen, 
-                    TIME_IN_US timeout);
+                 size_t len, 
+                 struct sockaddr *from,
+                 socklen_t *fromlen, 
+                 TIME_IN_US timeout);
     int recvmsg(struct msghdr *msg, 
-                   int flags,
-	               TIME_IN_US timeout);
-
+                int flags,
+                TIME_IN_US timeout);
+    
     bool close();
 
     bool setFd(int fd, bool makeNonBlocking = true);
@@ -107,17 +131,23 @@ struct UFIO
     unsigned int getRemotePort() const;
     UFIOScheduler* getUFIOScheduler() const;
 
-    static void ufCreateThreadWithIO(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+    static void ufCreateThreadWithIO(pthread_t* tid, UFList* ufsToStartWith);
 
     UFSleepInfo*                _sleepInfo;
     bool                        _markedActive;
     bool                        _active;
 
+    static int                  RECV_SOCK_BUF;
+    static int                  SEND_SOCK_BUF;
+
 protected:
     int                         _fd;
     unsigned int                _errno;
     UF*                         _uf;
     UFIOScheduler*              _ufios;
+    char*                       _readLineBuf;
+    size_t                      _readLineBufPos;
+    size_t                      _readLineBufSize;
 
     UFIO() { reset(); }
     void reset();
@@ -139,8 +169,8 @@ inline unsigned int UFIO::getRemotePort(
 
 
 struct UFIOScheduler;
-//typedef map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
-typedef hash_map<pthread_t, UFIOScheduler*, hash<uintptr_t> > ThreadFiberIOSchedulerMap;
+//typedef std::map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
+typedef std::hash_map<pthread_t, UFIOScheduler*, std::hash<uintptr_t> > ThreadFiberIOSchedulerMap;
 
 struct UFConnectionPool;
 struct UFIOScheduler
@@ -148,13 +178,13 @@ struct UFIOScheduler
     UFIOScheduler();
     virtual ~UFIOScheduler();
 
-    virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0) = 0;
-    virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0) = 0;
-    virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = 0) = 0;
-    virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0) = 0;
+    virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = -1) = 0;
+    virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = -1) = 0;
+    virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = -1) = 0;
+    virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = -1) = 0;
     virtual bool closeConnection(UFIO* ufio) = 0;
     //TODO: support regular poll behavior
-    virtual bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0) = 0;
+    virtual bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = -1) = 0;
 
     virtual bool isSetup() { return false; }
     virtual void waitForEvents(TIME_IN_US timeToWait) = 0;
@@ -174,7 +204,7 @@ inline UFConnectionPool* UFIOScheduler::
 
 #define MAX_FDS_FOR_EPOLL 128*1024-1
 //typedef map<int, UFIO*> IntUFIOMap;
-typedef hash_map<int, UFIO*, hash<int> > IntUFIOMap;
+typedef std::hash_map<int, UFIO*, std::hash<int> > IntUFIOMap;
 typedef std::multimap<TIME_IN_US, UFSleepInfo*> MapTimeUFIO;
 struct EpollUFIOScheduler : public UFIOScheduler
 {
@@ -185,15 +215,15 @@ struct EpollUFIOScheduler : public UFIOS
     bool isSetup(); //call after the c'tor to verify that the structure is correctly setup
 
     //inputInfo is the flags info (its an int*) for this addition
-    bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0);
-    bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0);
-    bool setupForRead(UFIO* ufio, TIME_IN_US to = 0);
-    bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0);
+    bool setupForConnect(UFIO* ufio, TIME_IN_US to = -1);
+    bool setupForAccept(UFIO* ufio, TIME_IN_US to = -1);
+    bool setupForRead(UFIO* ufio, TIME_IN_US to = -1);
+    bool setupForWrite(UFIO* ufio, TIME_IN_US to = -1);
     bool closeConnection(UFIO* ufio);
-    bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0);
+    bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = -1);
 
 
-    void waitForEvents(TIME_IN_US timeToWait = -1);
+    void waitForEvents(TIME_IN_US timeToWait = 0);
 
     bool                            _interruptedByEventFd;
 
@@ -208,16 +238,16 @@ protected:
 
 
     MapTimeUFIO                     _sleepList;
-    unsigned long long int          _earliestWakeUpFromSleep;
+    TIME_IN_US          _earliestWakeUpFromSleep;
 
     bool addToScheduler(UFIO* ufio, 
                         void* inputInfo /*flags to identify how ot add*/, 
-                        TIME_IN_US to = 0,
+                        TIME_IN_US to = -1,
                         bool wait = true, 
                         bool runEpollCtl = false);
 
 
-    list<UFSleepInfo*>  _availableSleepInfo;
+    std::deque<UFSleepInfo*>        _availableSleepInfo;
     UFSleepInfo* getSleepInfo();
     void releaseSleepInfo(UFSleepInfo& ufsi);
 };

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFPC.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFPC.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFPC.H Fri Aug 13 19:43:09 2010
@@ -3,67 +3,200 @@
 
 #include <set>
 #include <stack>
-#include "UF.H"
+#include <deque>
+#include <UF.H>
+//#include <Factory.H>
 
 struct UFMutex;
 struct UFProducerData;
 struct UFConsumer;
 struct UFProducer;
 
+
 struct UFConsumer
 {
     friend class UFProducer;
-    UFConsumer(bool notifyOnExitOnly = false);
-    virtual ~UFConsumer();
+    friend class UFJoinableProducer;
+    friend class UFNonJoinableProducer;
+
     //user needs to call input->releaseMe after consuming the data
-    UFProducerData* waitForData(UF* uf = 0);
+    UFProducerData* waitForData(UF* uf = 0, size_t* numRemaining = 0, TIME_IN_US timeToWait=0);
     bool hasData(UF* uf = 0);
 
-    bool joinProducer(UFProducer* ufp);
-    bool removeProducer(UFProducer* ufp);
-    void reset();
-
-    std::list<UFProducerData*>      _queueOfDataToConsume;
-    UFMutex                         _queueOfDataToConsumeLock;
     bool getNotifyOnExitOnly() const;
-
     UF* getUF() const;
+
     bool                            _requireLockToWaitForUpdate; //if the developer is aware that both the producer and all the consumers are going to run in the same thread - only then set this variable to false to gain some perf. benefits
-protected:
-    std::set<UFProducer*>           _consumersProducerSet;
+
+    UFConsumer();
+    virtual ~UFConsumer() { reset(); }
+    void reset();
+
+    virtual std::string getMyType() = 0;
+
     UFMutex                         _consumersProducerSetLock;
     bool                            _notifyOnExitOnly;
+
+
+protected:
     UF*                             _currUF;
+    std::string                     _myType;
+    std::deque<UFProducerData*>     _queueOfDataToConsume;
+    UFMutex                         _queueOfDataToConsumeLock;
+
+    void clearDataToConsume();
 };
 inline UF* UFConsumer::getUF() const { return _currUF; }
 inline bool UFConsumer::getNotifyOnExitOnly() const { return _notifyOnExitOnly; }
 
+
+struct UFJoinableProducer;
+struct UFJoinableConsumer : public UFConsumer
+{
+    friend class UFJoinableProducer;
+    friend class UFProducer;
+    UFJoinableConsumer(bool notifyOnExitOnly = false);
+    ~UFJoinableConsumer() { resetMe(); };
+    bool joinProducer(UFJoinableProducer* ufp);
+    bool removeProducer(UFJoinableProducer* ufp);
+
+    virtual std::string getMyType() { return "UFJoinableConsumer"; }
+
+protected:
+    std::deque<UFJoinableProducer*>         _consumersProducerSet;
+    void resetMe();
+};
+
+
+struct UFNonJoinableProducer;
+struct UFNonJoinableConsumer : public UFConsumer
+{
+    friend class UFNonJoinableProducer;
+    friend class UFProducer;
+    UFNonJoinableConsumer(UFNonJoinableProducer* ufp, bool notifyOnExitOnly = false);
+    ~UFNonJoinableConsumer() { resetMe(); }
+    void resetMe();
+
+    virtual std::string getMyType() { return "UFNonJoinableConsumer"; }
+
+protected:
+    bool removeProducer();
+    UFNonJoinableProducer*         _ufp;
+};
+
+
+
+
+
+
+struct UFDataObject;
 struct UFProducer
 {
+    friend class UFProducerConsumerPair;
     friend class UFConsumer;
     UFProducer();
-    virtual ~UFProducer();
-    bool removeConsumer(UFConsumer* ufc);
-    bool produceData(void* data, unsigned int size, int ufpcCode, bool freeDataOnExit = true, UF* uf = 0);
+    virtual ~UFProducer() {}
+    size_t produceData(UFDataObject* data, int ufpcCode, bool freeDataOnExit = true, UF* uf = 0);
+    size_t produceData(UFProducerData* ufpd, UF* uf = 0);
+    virtual size_t getConsumerCount() = 0;
     void reset();
     void init();
+    bool                            _sendEOFAtEnd;
     bool                            _requireLockToUpdateConsumers;//if the developer is aware that both the producer and the consumers are going to run in the same thread - only then set this variable to false to gain some perf. benefits
 
 protected:
-    std::set<UFConsumer*>           _producersConsumerSet;
-    size_t                          _producersConsumerSetSize;
     UFConsumer*                     _mostRecentConsumerAdded;
     UFMutex                         _producersConsumerSetLock; //needed when the consumers are adding or removing themselves from the consumerList
     bool                            _acceptNewConsumers;
 
-    bool addConsumer(UFConsumer* ufc);
+    virtual bool addConsumer(UFConsumer* ufc) = 0;
+    virtual bool removeConsumer(UFConsumer* ufc) = 0;
+    virtual size_t updateConsumers(UFProducerData* ufpd, UF* uf) = 0;
+    virtual void removeAllConsumers() = 0;
+
     UF*                             _uf;
 };
+inline UFProducer::UFProducer() { init(); }
+inline void UFProducer::init() 
+{ 
+    _acceptNewConsumers = true; 
+    _requireLockToUpdateConsumers = true; 
+    _mostRecentConsumerAdded = 0;
+    _sendEOFAtEnd = true;
+}
+
+struct UFJoinableProducer : public UFProducer
+{
+    UFJoinableProducer() {}
+    ~UFJoinableProducer() { reset(); };
+    bool addConsumer(UFConsumer* ufc);
+    bool removeConsumer(UFConsumer* ufc);
+    size_t getConsumerCount();
+
+protected:
+    std::deque<UFConsumer*>         _producersConsumerSet;
+    size_t updateConsumers(UFProducerData* ufpd, UF* uf);
+    void removeAllConsumers();
+};
+inline size_t UFJoinableProducer::getConsumerCount()
+{
+    if(!_requireLockToUpdateConsumers)
+        return _producersConsumerSet.size();
+
+    size_t count = 0;
+    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    _producersConsumerSetLock.lock(uf);    
+    count = _producersConsumerSet.size();
+    _producersConsumerSetLock.unlock(uf);
+
+    return count;
+}
+
+struct UFNonJoinableProducer : public UFProducer
+{
+    friend class UFNonJoinableConsumer;
+    UFNonJoinableProducer(UFNonJoinableConsumer* ufc) { if(ufc) _mostRecentConsumerAdded = ufc; }
+    ~UFNonJoinableProducer() { reset(); }
+    size_t getConsumerCount();
+
+protected:
+    UFConsumer*     _ufc;
+    UFNonJoinableProducer() {}
+    size_t updateConsumers(UFProducerData* ufpd, UF* uf);
+    void removeAllConsumers();
+    bool addConsumer(UFConsumer* ufc);
+    bool removeConsumer(UFConsumer* ufc);
+};
+inline size_t UFNonJoinableProducer::getConsumerCount()
+{
+    if(!_requireLockToUpdateConsumers)
+        return _mostRecentConsumerAdded ? 1 : 0;
+
+    size_t count = 0;
+    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+    _producersConsumerSetLock.lock(uf);    
+    count = _mostRecentConsumerAdded ? 1 : 0;
+    _producersConsumerSetLock.unlock(uf);
+
+    return count;
+}
+
+
+
+
+
+
+struct UFDataObject
+{
+    UFDataObject();
+    virtual ~UFDataObject();
+};
+inline UFDataObject::UFDataObject() {}
+inline UFDataObject::~UFDataObject() {}
 
 struct UFProducerData
 {
-    void*                           _data;
-    unsigned int                    _size;
+    UFDataObject*                   _data;
     UFProducer*                     _producerWhichInserted;
     int                             _ufpcCode;
     bool                            _freeDataOnExit;
@@ -76,10 +209,10 @@ struct UFProducerData
     static UFProducerData* getObj();
     static void releaseObj(UFProducerData* obj);
 
-
-protected:
     ~UFProducerData();
     UFProducerData() { reset(); }
+
+protected:
     UFMutex                         _controlReferenceCount; //control the ref. count of this data
     size_t                          _referenceCount;
     static std::stack<UFProducerData*> _objList;
@@ -92,67 +225,34 @@ inline void UFProducerData::reset()
     _lockToUpdate = true;
 }
 
-inline bool UFProducer::addConsumer(UFConsumer* ufc)
+inline void UFProducerData::addRef(size_t numToAdd)
 {
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    if(!_acceptNewConsumers)
+    if(!_lockToUpdate)
     {
-        if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-        return false;
+        _referenceCount += numToAdd;
+        return;
     }
-    _producersConsumerSet.insert(ufc); //check insertion
-    _producersConsumerSetSize++;
-    _mostRecentConsumerAdded = ufc;
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-    return true;
+
+    _controlReferenceCount.getSpinLock();
+    _referenceCount += numToAdd;
+    _controlReferenceCount.releaseSpinLock();
 }
 
-inline bool UFProducer::removeConsumer(UFConsumer* ufc)
+inline void UFProducerData::reduceRef()
 {
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    _producersConsumerSet.erase(ufc);
-    _producersConsumerSetSize--;
-    if(_requireLockToUpdateConsumers) 
-        _producersConsumerSetLock.signal();
-    else
+    if(!_lockToUpdate)
     {
-        if(_uf)
-            UFScheduler::getUFScheduler()->addFiberToScheduler(_uf);
+        --_referenceCount;
+        return;
     }
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-    return true;
-}
-
-inline void UFProducerData::addRef(size_t numToAdd)
-{
-    if(_lockToUpdate) _controlReferenceCount.getSpinLock();
-    _referenceCount = numToAdd;
-    if(_lockToUpdate) _controlReferenceCount.releaseSpinLock();
-}
 
-inline void UFProducerData::reduceRef()
-{
-    if(_lockToUpdate) _controlReferenceCount.getSpinLock();
+    _controlReferenceCount.getSpinLock();
     --_referenceCount;
-    if(_lockToUpdate) _controlReferenceCount.releaseSpinLock();
+    _controlReferenceCount.releaseSpinLock();
 }
 
 inline UFProducerData* UFProducerData::getObj()
 {
-    /*
-    _objListMutex.getSpinLock();
-    if(!_objList.empty())
-    {
-        UFProducerData* retVal = _objList.top();
-        _objList.pop();
-        _objListMutex.releaseSpinLock();
-        retVal->reset();
-        return retVal;
-    }
-    _objListMutex.releaseSpinLock();
-    */
     return new UFProducerData();
 }
 
@@ -164,11 +264,6 @@ inline void UFProducerData::releaseObj(U
     if(obj->_lockToUpdate) obj->_controlReferenceCount.getSpinLock();
     if(!--obj->_referenceCount)
     {
-        /*
-        _objListMutex.getSpinLock();
-        _objList.push(obj);
-        _objListMutex.releaseSpinLock();
-        */
         delete obj;
         return;
     }
@@ -178,19 +273,40 @@ inline void UFProducerData::releaseObj(U
 inline UFProducerData::~UFProducerData()
 {
     if(_freeDataOnExit && _data)
-        free (_data);
+        delete (_data);
 }
 
-inline UFConsumer::~UFConsumer() { reset(); } 
 
-inline UFProducer::~UFProducer() { reset(); }
-inline void UFProducer::init() 
-{ 
-    _acceptNewConsumers = true; 
-    _requireLockToUpdateConsumers = true; 
-    _producersConsumerSetSize = 0;
-    _mostRecentConsumerAdded = 0;
+
+struct UFProducerConsumerPair
+{
+    UFProducerConsumerPair();
+    ~UFProducerConsumerPair();
+    UFNonJoinableConsumer* getConsumer() const { return _c; }
+    UFNonJoinableProducer* getProducer() const { return _p; }
+
+protected:
+    UFNonJoinableConsumer*      _c;
+    UFNonJoinableProducer*      _p;
+};
+inline UFProducerConsumerPair::UFProducerConsumerPair()
+{
+    _p = new UFNonJoinableProducer(0);
+    _c = new UFNonJoinableConsumer(_p);
+}
+inline UFProducerConsumerPair::~UFProducerConsumerPair()
+{
+    if(_p)
+    {
+        _p->_requireLockToUpdateConsumers = false; //dont waste time on locking
+        _p->_uf = 0; //the producer shouldnt be associated w/ any uf anymore
+    }
+    if(_c) 
+    { 
+        _c->_requireLockToWaitForUpdate = false; //dont waste time on locking
+        delete _c; 
+    }
+    if(_p) delete _p; //the producer is killed after the consumer, so that the producer doesnt have to wait for the consumer to die and simply keep the uf alive
 }
-inline UFProducer::UFProducer() { init(); }
 
 #endif

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFServer.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFServer.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFServer.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFServer.H Fri Aug 13 19:43:09 2010
@@ -4,9 +4,10 @@
 #include <pthread.h>
 #include <map>
 #include <vector>
+#include <list>
 
-#include "UF.H"
-#include "UFIO.H"
+#include <UF.H>
+#include <UFIO.H>
 
 typedef std::map<std::string, std::vector<pthread_t>* > StringThreadMapping;
 struct UFServerThreadChooser;
@@ -26,10 +27,16 @@ public:
     unsigned int getProcessCount() const { return _childProcesses.size(); }
     unsigned int UF_STACK_SIZE;
     const char* getBindingInterface() const { return _addressToBindTo.c_str() ; }
-    unsigned int getPort() const { return _port ; }
-    unsigned int getListenFd() const { return _listenFd; }
-    unsigned int setListenFd(int fd) { return (_listenFd = fd); }
 
+    struct ListenSocket
+    {
+        unsigned short int port;
+        int fd;
+        ListenSocket(unsigned short int p = 0, int f = -1) : port(p), fd(f) { };
+    };
+    typedef std::list<ListenSocket> ListenSocketList;
+
+    const ListenSocketList &getListenSockets() const { return _listenSockets; }
 
     //functions that are allowed to be modified by the 
     //inherited class to provide customizable functionalities 
@@ -38,7 +45,9 @@ public:
     virtual void postBetweenFork(int childPid) {}
     virtual void postForkPreRun() {}
     virtual void preThreadRun() {}
+    virtual void postThreadRun() {}
     virtual void preThreadCreation() {}
+    virtual void postThreadCreation() {}
     virtual void preAccept() {}
 
 
@@ -50,15 +59,14 @@ public:
 
     StringThreadMapping* getThreadList() { return &_threadList; };
     std::vector<pthread_t>* getThreadType(const std::string& type);
-    void addThread(const std::string& type, UFScheduler* ufScheduler);
+    void addThread(const std::string& type, UFScheduler* ufScheduler, pthread_t tid=0);
     void run();
 
 protected:
     void reset();
 
     std::string                     _addressToBindTo;
-    unsigned int                    _port;
-    int                             _listenFd;
+    ListenSocketList                _listenSockets;
     unsigned int                    _listenBackLog;
 
     time_t                          _creationTime;
@@ -72,30 +80,39 @@ protected:
 
     void startThreads();
 
+    void _addListenPort(unsigned short int port)
+    {
+        _listenSockets.push_back(ListenSocket(port));
+    }
+
 private:
     //start processing
 };
 
+
+
 struct UFServerThreadChooser : public UFIOAcceptThreadChooser
 {
     UFServerThreadChooser() { }
 
     std::pair<UFScheduler*, pthread_t> pickThread(int listeningFd);
-    void add(UFScheduler* ufs, pthread_t tid)
-    {
-        _threadList.push_back(make_pair(ufs, tid));
-    }
+    void add(UFScheduler* ufs, pthread_t tid);
 
 protected:
-    std::vector<pair<UFScheduler*, pthread_t> > _threadList;
+    std::vector<std::pair<UFScheduler*, pthread_t> > _threadList;
 };
 
+inline void UFServerThreadChooser::add(UFScheduler* ufs, pthread_t tid)
+{
+    _threadList.push_back(std::make_pair(ufs, tid));
+}
+
 inline std::pair<UFScheduler*, pthread_t> UFServerThreadChooser::pickThread(int listeningFd)
 {
     static unsigned int lastLocUsed = 0;
     if(!_threadList.size())
     {
-        cerr<<"there has to be some fabric to hand the request to"<<endl;
+        std::cerr<<"there has to be some fabric to hand the request to"<<std::endl;
         exit(1);
     }
 

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H Fri Aug 13 19:43:09 2010
@@ -11,7 +11,7 @@
 #include <vector>
 #include <utility>
 #include <sstream>
-#include "UF.H"
+#include <UF.H>
 
 struct Stat
 {

Modified: trafficserver/traffic/branches/UserFiber/core/include/UFStats.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStats.H?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/include/UFStats.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/include/UFStats.H Fri Aug 13 19:43:09 2010
@@ -6,6 +6,7 @@
 namespace UFStats
 {
     void registerStats(bool lock_needed = false);
+    extern uint32_t currentConnections;
     extern uint32_t connectionsHandled;
     extern uint32_t txnSuccess;
     extern uint32_t txnFail;

Propchange: trafficserver/traffic/branches/UserFiber/core/include/UFSwapContext.H
------------------------------------------------------------------------------
    svn:mergeinfo = 

Modified: trafficserver/traffic/branches/UserFiber/core/src/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/Makefile?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/Makefile Fri Aug 13 19:43:09 2010
@@ -33,20 +33,20 @@ $(ARES_SRC):
 	   tar xzf ./$(ARES_SRC_FILE); \
 	fi
 
-$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H UFSwapContext.H
+$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UF.o UF.C
 
 $(LIB_DIR)/UFPC.o: UFPC.C $(INCLUDE_DIR)/UFPC.H $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFPC.o UFPC.C
 
-$(LIB_DIR)/UFConnectionPoolImpl.o: $(LIB_DIR)/UFAres.o UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
+$(LIB_DIR)/UFConnectionPoolImpl.o: UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConnectionPoolImpl.o UFConnectionPoolImpl.C
 
 $(LIB_DIR)/UFIO.o: UFIO.C $(INCLUDE_DIR)/UFIO.H $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFIO.o UFIO.C
 
-$(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_DIR)/UFAres.H $(INCLUDE_DIR)/UFDNS.H $(INCLUDE_DIR)/UFHostEnt.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o $(ARES_SRC)
-	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFAres.o UFAres.C
+#$(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_DIR)/UFAres.H $(INCLUDE_DIR)/UFDNS.H $(INCLUDE_DIR)/UFHostEnt.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o $(ARES_SRC)
+#	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFAres.o UFAres.C
 
 $(LIB_DIR)/UFStatSystem.o: UFStatSystem.C $(INCLUDE_DIR)/UFStatSystem.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o
 	$(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFStatSystem.o UFStatSystem.C
@@ -63,8 +63,9 @@ $(LIB_DIR)/UFServer.o: UFServer.C $(INCL
 $(LIB_DIR)/UFSwapContext.o: UFSwapContext.S
 	$(CC) -c -o $@ $^
 
-$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
-	$(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $(ARESDIR)/$(ARES)/.libs/*.o $^
+#$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
+$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o
+	$(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $^
 	$(RANLIB) $(LIB_DIR)/libUF.a
 
 clean: 

Modified: trafficserver/traffic/branches/UserFiber/core/src/UF.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UF.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UF.C Fri Aug 13 19:43:09 2010
@@ -1,5 +1,4 @@
-#include "UF.H"
-#include "UFConnectionPool.H"
+#include <UF.H>
 
 #include <string.h>
 #include <iostream>
@@ -10,7 +9,7 @@
 #include <stdio.h>
 #include <malloc.h>
 #include <sys/mman.h>
-#include "UFSwapContext.H"
+#include <UFSwapContext.H>
 
 using namespace std;
 
@@ -33,12 +32,13 @@ static void runFiber(void* args)
 
 ///////////////UF/////////////////////
 UFFactory* UFFactory::_instance = 0;
-const unsigned int DEFAULT_STACK_SIZE = 4*4096;
+unsigned int UF::DEFAULT_STACK_SIZE = 4*4096;
 UFId UF::_globalId = 0;
 
 UF::UF()
 { 
-    _startingArgs = 0;
+    reset();
+    _myId = ++_globalId;  //TODO: make atomic
     setup();
 }
 
@@ -59,17 +59,14 @@ bool UF::setup(void* stackPtr, size_t st
     }
 #endif
 
-    _myId = ++_globalId;  //TODO: make atomic
-    _status = NOT_STARTED;
-
     if(!stackPtr || !stackSize)
     {
 #ifndef DEBUG
-        _UFContext.uc_stack.ss_size = (stackSize) ? stackSize : DEFAULT_STACK_SIZE;
+        _UFContext.uc_stack.ss_size = (stackSize) ? stackSize : UF::DEFAULT_STACK_SIZE;
         _UFContext.uc_stack.ss_sp = (void*) malloc (_UFContext.uc_stack.ss_size);
 #else
-        _UFContext.uc_stack.ss_size = DEFAULT_STACK_SIZE;
-        _UFContext.uc_stack.ss_sp = (void*) memalign (pageSize, DEFAULT_STACK_SIZE);
+        _UFContext.uc_stack.ss_size = UF::DEFAULT_STACK_SIZE;
+        _UFContext.uc_stack.ss_sp = (void*) memalign (pageSize, UF::DEFAULT_STACK_SIZE);
         if(!_UFContext.uc_stack.ss_sp)
         {
             cerr<<"couldnt allocate space from memalign "<<strerror(errno)<<endl;
@@ -91,8 +88,6 @@ bool UF::setup(void* stackPtr, size_t st
     }
     _UFContext.uc_stack.ss_flags = 0;
 
-    _parentScheduler = 0;
-
     return true;
 }
 
@@ -116,7 +111,6 @@ static pthread_key_t getThreadKey()
 pthread_key_t UFScheduler::_specific_key = getThreadKey();
 UFScheduler::UFScheduler()
 {
-    _activeRunningListSize = 0;
     _earliestWakeUpFromSleep = 0;
     _exitJustMe = false;
     _specific = 0;
@@ -161,25 +155,39 @@ UFScheduler::UFScheduler()
 
     pthread_setspecific(_specific_key, this);
     _amtToSleep = 0;
+    _runCounter = 1;
 }
 
-UFScheduler::~UFScheduler() { /*pthread_key_delete(_specific_key);*/ }
+UFScheduler::~UFScheduler() 
+{ 
+    //remove the UFScheduler associated w/ this thread
+    pthread_mutex_lock(&_mutexToCheckFiberSchedulerMap);
+    ThreadUFSchedulerMap::iterator index = _threadUFSchedulerMap.find(pthread_self());
+    if(index != _threadUFSchedulerMap.end())
+        _threadUFSchedulerMap.erase(index);
+    pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
+
+    /*pthread_key_delete(_specific_key);*/ 
+}
 
 bool UFScheduler::addFiberToSelf(UF* uf)
 {
-    if(uf->_status == WAITING_TO_RUN) //UF is already in the queue
+    if(!uf)
+        return false;
+    if(uf->_status == WAITING_TO_RUN || 
+       uf->_status == YIELDED) //UF is already in the queue
         return true;
     uf->_status = WAITING_TO_RUN;
-    if(uf->_parentScheduler) //probably putting back an existing uf into the active list
+    if(uf->getParentScheduler()) //probably putting back an existing uf into the active list
     {
-        if(uf->_parentScheduler == this) //check that we're scheduling for the same thread
+        if(uf->getParentScheduler() == this) //check that we're scheduling for the same thread
         {
-            _activeRunningList.push_back(uf); ++_activeRunningListSize;
+            _activeRunningList.push_front(uf);
             return true;
         }
         else
         {
-            cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->_parentScheduler<<endl;
+            cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->getParentScheduler()<<endl;
             abort(); //TODO: remove the abort
             return false;
         }
@@ -202,12 +210,15 @@ bool UFScheduler::addFiberToSelf(UF* uf)
         cerr<<"error while trying to run makecontext"<<endl;
         return false;
     }
-    _activeRunningList.push_back(uf); ++_activeRunningListSize;
+    _activeRunningList.push_front(uf);
     return true;
 }
 
 bool UFScheduler::addFiberToAnotherThread(const list<UF*>& ufList, pthread_t tid)
 {
+    if(ufList.empty())
+        return false;
+
     //find the other thread -- 
     //TODO: have to lock before looking at this map - 
     //since it could be changed if more threads are added later - not possible in the test that is being run (since the threads are created before hand)
@@ -260,15 +271,13 @@ bool UFScheduler::addFiberToScheduler(co
         return true;
 
     //adding to the same scheduler and as a result thread as the current job
-    UF* uf = 0;
-    list<UF*>::const_iterator beg = ufList.begin();
-    list<UF*>::const_iterator ending = ufList.end();
     if(!tid || (tid == pthread_self()))
     {
+        list<UF*>::const_iterator beg = ufList.begin();
+        list<UF*>::const_iterator ending = ufList.end();
         for(; beg != ending; ++beg)
         {
-            uf = *beg;
-            if(addFiberToSelf(uf))
+            if(addFiberToSelf(*beg))
                 continue;
             else
                 return false;
@@ -295,23 +304,31 @@ void UFScheduler::runScheduler()
     _amtToSleep = DEFAULT_SLEEP_IN_USEC;
     bool ranGetTimeOfDay = false;
 
-    UFList::iterator beg;
     struct timeval now;
     struct timeval start,finish;
     gettimeofday(&start, 0);
-    unsigned long long int timeNow = 0;
+    TIME_IN_US timeNow = 0;
 
-    UFList::iterator ufBeg;
-    UFList::iterator nBeg;
     MapTimeUF::iterator slBeg;
     bool waiting = false;
+    //unsigned long long int runCounter = 1;
     while(!shouldExit())
     {
-        for(ufBeg = _activeRunningList.begin(); ufBeg != _activeRunningList.end(); )
+        ++_runCounter;
+        while(!_activeRunningList.empty())
         {
-            UF* uf = *ufBeg;
-            _currentFiber = uf;
+            if(shouldExit())
+                break;
+
+            UF* uf = _activeRunningList.front();
+            if(uf->_status == YIELDED &&
+               uf->_lastRun == _runCounter) //we have looped back
+                break;
+            //printf("%lu - running uf %lu on iter %llu\n", pthread_self(), (uintptr_t)uf, _runCounter);
+            _activeRunningList.pop_front();
+            uf->_lastRun = _runCounter;
             uf->_status = RUNNING;
+            _currentFiber = uf;
 #if __WORDSIZE == 64
             uf_swapcontext(&_mainContext, &(uf->_UFContext));
 #else
@@ -320,20 +337,18 @@ void UFScheduler::runScheduler()
             _currentFiber = 0;
 
             if(uf->_status == BLOCKED)
-            {
-                ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
                 continue;
-            }
             else if(uf->_status == COMPLETED) 
             {
-                delete uf;
-                ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
+                if(uf->_myFactory)
+                    uf->_myFactory->releaseUF(uf);
+                else
+                    delete uf;
                 continue;
             }
-
             //else uf->_status == RUNNING
-            uf->_status = WAITING_TO_RUN;
-            ++ufBeg;
+            uf->_status = YIELDED;
+            _activeRunningList.push_back(uf);
         }
 
 
@@ -354,20 +369,18 @@ void UFScheduler::runScheduler()
             //TODO: do atomic comparison to see if there is anything in 
             //_nominateToAddToActiveRunningList before getting the lock
             pthread_mutex_lock(&_mutexToNominateToActiveList);
-            for(nBeg = _nominateToAddToActiveRunningList.begin();
-                nBeg != _nominateToAddToActiveRunningList.end(); )
+            do
             {
-                UF* uf = *nBeg;
-                if(uf->_parentScheduler)
+                UF* uf = _nominateToAddToActiveRunningList.front();
+                if(uf->getParentScheduler())
                 {
                     uf->_status = WAITING_TO_RUN;
-                    _activeRunningList.push_front(uf); ++_activeRunningListSize;
+                    _activeRunningList.push_front(uf);
                 }
                 else //adding a new fiber
                     addFiberToScheduler(uf, 0);
-                nBeg = _nominateToAddToActiveRunningList.erase(nBeg);
-            }
-
+                _nominateToAddToActiveRunningList.pop_front();
+            }while(!_nominateToAddToActiveRunningList.empty());
             pthread_mutex_unlock(&_mutexToNominateToActiveList);
         }
 
@@ -393,7 +406,7 @@ void UFScheduler::runScheduler()
                         if(ufwi->_uf)
                         {
                             ufwi->_uf->_status = WAITING_TO_RUN;
-                            _activeRunningList.push_front(ufwi->_uf); ++_activeRunningListSize;
+                            _activeRunningList.push_front(ufwi->_uf);
                             ufwi->_uf = NULL;
                         }
                         waiting = ufwi->_waiting;
@@ -419,13 +432,13 @@ void UFScheduler::runScheduler()
         }
 
         //see if there is anything to do or is it just sleeping time now
-        if(!_notifyFunc && !_activeRunningListSize && !shouldExit())
+        if(!_notifyFunc && _activeRunningList.empty() && !shouldExit())
         {
             if(_inThreadedMode) //go to conditional wait (in threaded mode)
             {
                 struct timespec ts;
-                unsigned long long int nSecToIncrement = (int)(_amtToSleep/1000000);
-                unsigned long long int nUSecToIncrement = (int)(_amtToSleep%1000000);
+                int nSecToIncrement = (int)(_amtToSleep/1000000);
+                TIME_IN_US nUSecToIncrement = (TIME_IN_US)(_amtToSleep%1000000);
                 if(!ranGetTimeOfDay)
                     gettimeofday(&now, 0);
                 ts.tv_sec = now.tv_sec + nSecToIncrement;
@@ -442,7 +455,7 @@ void UFScheduler::runScheduler()
     }
     gettimeofday(&finish, 0);
 
-    unsigned long long int diff = (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec - start.tv_usec);
+    TIME_IN_US diff = (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec - start.tv_usec);
     cerr<<pthread_self()<<" time taken in this thread = "<<diff<<"us"<<endl;
 }
 
@@ -498,10 +511,10 @@ int UFFactory::registerFunc(UF* uf)
     return _size++;
 }
 
-const unsigned int CONSECUTIVE_LOCK_FAILURES_ALLOWED = 3;
+const unsigned int CONSECUTIVE_LOCK_FAILURES_ALLOWED = 15;
 bool UFMutex::lock(UF* uf)
 {
-    if(!uf || !uf->_parentScheduler)
+    if(!uf || !uf->getParentScheduler())
         return false;
 
     getSpinLock();
@@ -575,38 +588,28 @@ bool UFMutex::unlock(UF* uf)
     if(!uf)
         return false;
 
-    UFList::iterator beg;
     getSpinLock();
 
-    beg = _listOfClientsWaitingOnLock.begin();
-    if(uf == *beg) //check if this uf is the current owner of this lock
+    if(uf == _listOfClientsWaitingOnLock.front()) //check if this uf is the current owner of this lock
     {
+        _listOfClientsWaitingOnLock.pop_front();
         _lockCurrentlyOwned = false;
-        beg = _listOfClientsWaitingOnLock.erase(beg);
 #ifdef LOCK_DEBUG
         printf("%lu u %d\n", (unsigned long int) ((uintptr_t)(void*)uf), _listOfClientsWaitingOnLock.size());
 #endif
 
-        bool releasedLock = false;
         //notify the next UF in line
         while(!_listOfClientsWaitingOnLock.empty())
         {
-            UF* tmpUf = *beg;
-            if(!tmpUf || !tmpUf->_parentScheduler) //invalid tmpuf - cant wake it up
+            UF* tmpUf = _listOfClientsWaitingOnLock.front();
+            if(!tmpUf || !tmpUf->getParentScheduler()) //invalid tmpuf - cant wake it up
             {
 #ifdef LOCK_DEBUG
                 printf("%lu nf1\n", (unsigned long int) ((uintptr_t)(void*)uf));
 #endif
-                beg = _listOfClientsWaitingOnLock.erase(beg);
-                if(beg == _listOfClientsWaitingOnLock.end())
-                    break;
+                _listOfClientsWaitingOnLock.pop_front();
                 continue;
             }
-            /*
-            if(tmpUf->getStatus() == WAITING_TO_RUN) //this uf has already been put into the waiting to run list
-                break;
-                */
-
 
 #ifdef LOCK_DEBUG
             printf("%lu wk %lu\n", 
@@ -615,14 +618,11 @@ bool UFMutex::unlock(UF* uf)
 #endif
 
             releaseSpinLock();
-            releasedLock = true;
-            uf->_parentScheduler->addFiberToScheduler(tmpUf, tmpUf->_parentScheduler->_tid);
-            break;
+            uf->getParentScheduler()->addFiberToScheduler(tmpUf, tmpUf->_parentScheduler->_tid);
+            return true;
         }
 
-        if(!releasedLock)
-            releaseSpinLock();
-
+        releaseSpinLock();
         return true;
     }
     else
@@ -635,7 +635,7 @@ bool UFMutex::unlock(UF* uf)
     return false;
 }
 
-bool UFMutex::tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS)
+bool UFMutex::tryLock(UF* uf, TIME_IN_US autoRetryIntervalInUS)
 {
     while(1)
     {
@@ -668,7 +668,7 @@ bool UFMutex::condWait(UF* uf)
     //the object is already in the hash
     if(_listOfClientsWaitingOnCond.find(uf) == _listOfClientsWaitingOnCond.end())
     {
-        UFWaitInfo *ufwi = uf->_parentScheduler->getWaitInfo();
+        UFWaitInfo *ufwi = uf->getParentScheduler()->getWaitInfo();
         ufwi->_uf = uf;
         ufwi->_waiting = true;
     
@@ -708,7 +708,7 @@ void UFMutex::broadcast()
         // If uf is not NULL, schedule it and make sure no one else can schedule it again
         if(ufwi->_uf) 
         {
-            ufs->addFiberToScheduler(ufwi->_uf, ufwi->_uf->_parentScheduler->_tid);
+            ufs->addFiberToScheduler(ufwi->_uf, ufwi->_uf->getParentScheduler()->_tid);
             ufwi->_uf = NULL;
         }
         
@@ -760,10 +760,10 @@ void UFMutex::signal()
     }
 
     if(uf_to_signal)
-        ufs->addFiberToScheduler(uf_to_signal, uf_to_signal->_parentScheduler->_tid);
+        ufs->addFiberToScheduler(uf_to_signal, uf_to_signal->getParentScheduler()->_tid);
 }
 
-int UFMutex::condTimedWait(UF* uf, unsigned long long int sleepAmtInUs)
+bool UFMutex::condTimedWait(UF* uf, TIME_IN_US sleepAmtInUs)
 {
     bool result = false;
     if(!uf)
@@ -783,9 +783,9 @@ int UFMutex::condTimedWait(UF* uf, unsig
     // Add to sleep queue
     struct timeval now;
     gettimeofday(&now, 0);
-    unsigned long long int timeNow = timeInUS(now);
+    TIME_IN_US timeNow = timeInUS(now);
     ufwi->_sleeping = true;
-    uf->_parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), ufwi));
+    uf->getParentScheduler()->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), ufwi));
     
     uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal, broadcast or timeout has occurred
 
@@ -802,11 +802,11 @@ void* setupThread(void* args)
     if(!args)
         return 0;
 
-    list<UF*>* ufsToStartWith = (list<UF*>*) args;
+    UFList* ufsToStartWith = (UFList*) args;
     UFScheduler ufs;
     ufs.addFiberToScheduler(*ufsToStartWith, 0);
     delete ufsToStartWith;
-
+    
     //run the scheduler
     ufs.runScheduler();
 
@@ -819,10 +819,24 @@ void UFScheduler::ufCreateThread(pthread
     pthread_attr_t attr;
     pthread_attr_init(&attr);
     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
-
+    
     if(pthread_create(tid, &attr, setupThread, (void*)ufsToStartWith) != 0)
     {
         cerr<<"couldnt create thread "<<strerror(errno)<<endl;
         exit(1);
     }
 }
+
+string getPrintableTime()
+{
+    char asctimeDate[32];
+    asctimeDate[0] = '\0';
+    time_t now = time(0);
+    asctime_r(localtime(&now), asctimeDate);
+
+    string response = asctimeDate;
+    size_t loc = response.find('\n');
+    if(loc != string::npos)
+        response.replace(loc, 1, "");
+    return response;
+}

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFAres.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFAres.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFAres.C Fri Aug 13 19:43:09 2010
@@ -1,6 +1,8 @@
-#include "UFAres.H"
+#include <UFAres.H>
 #include <stdio.h>
 
+using namespace std;
+
 static void printHost(struct hostent* host , struct ares_addrttl *ttls = 0, int nttl = 0)
 {
   //  return;



Re: svn commit: r985334 [1/3] - in /trafficserver/traffic/branches/UserFiber: core/include/ core/src/ samples/

Posted by Paul Querna <pa...@querna.org>.
-1, please revert this code drop.

This is a massive drop of code, with absolutely no discussion on the
mailing list about it.

This seems to be a built up work of several peoeple, over what of must
of taken some time, but is being shoved into the Apache Subversion
repository with no disucssion about what is going on.

This needs to be done out in the open.  Let each of the people
referenced in the commit message commit the code as it comes, or
submit patches, but not as a massive unreviewable code drop with no
previous discussion on the mailing list.

In addition, many of the files are missing the correct licensing.

This is not a "branch" of traffic server, its a code drop of a new
code base with no involvement of the community.

On Fri, Aug 13, 2010 at 12:43 PM,  <ak...@apache.org> wrote:
> Author: akundu
> Date: Fri Aug 13 19:43:09 2010
> New Revision: 985334
>
> URL: http://svn.apache.org/viewvc?rev=985334&view=rev
> Log:
> Steve Jiang
>    - UFIO
>        - support for readline
>        - change TIME_IN_US to default to -1 and be of type ssize_t
>        - update IO fxns to use TIME_IN_US
>
> Bryan Call
>    - add UFSwapContext.H - expose the internal swap context fxn
>    - add setter+getter fxns in UFIO + UF
>
> Raghav Jeyaraman
>    - UFStats
>        - support for current connections count
>    - UFConf
>        - take the value as an input type into templated getValue fxns
>    - UFServer
>        - per thread initializer support
>
> Anirban Kundu
>    - UFHTTPLoader
>        - increased default options to support exiting early
>        - put an interval between the threads starting to do work
>        - use new timeout mode of -1 meaning no timeout
>    - UF.H
>        - use a deque to track the lock instead of a list
>        - support UF objects being created and destroyed using a factory
>        - ability to set the stack size for future created UFs
>        - run all active (and newly created) UFs first before sleeping
>    - UFPC.H
>        - support for joinable and non-joinable producers/consumers
>        - support to not use a lock in managing the producer and consumer
>    - UFIO.H
>        - support for writev (similar to state threads writev implementation)
>    - UFConnectionPool
>        - support to remove unused connections after a little while
>
> Manjesh Nilange
>        - support for listening to multiple ports from the same server
>
>
> Added:
>    trafficserver/traffic/branches/UserFiber/core/include/UFSwapContext.H   (props changed)
>      - copied unchanged from r985331, trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
> Removed:
>    trafficserver/traffic/branches/UserFiber/core/src/UFSwapContext.H
> Modified:
>    trafficserver/traffic/branches/UserFiber/core/include/UF.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFConf.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFServer.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H
>    trafficserver/traffic/branches/UserFiber/core/include/UFStats.H
>    trafficserver/traffic/branches/UserFiber/core/src/Makefile
>    trafficserver/traffic/branches/UserFiber/core/src/UF.C
>    trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
>    trafficserver/traffic/branches/UserFiber/core/src/UFConf.C
>    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
>    trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
>    trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
>    trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
>    trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
>    trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
>    trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
>    trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UF.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UF.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UF.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UF.H Fri Aug 13 19:43:09 2010
> @@ -2,7 +2,6 @@
>  #define USERTHREADS_H
>
>  #include <sys/time.h>
> -
>  #include <iostream>
>  #include <map>
>  #include <stdint.h>
> @@ -10,12 +9,16 @@
>  #include <set>
>  #include <vector>
>  #include <list>
> +#include <deque>
>  #include <ucontext.h>
>  #include <pthread.h>
>  #include <errno.h>
>
> -using namespace std;
> +#include <UFSwapContext.H>
> +//#include <ufutil/Factory.H>
> +
>  namespace std { using namespace __gnu_cxx; }
> +std::string getPrintableTime();
>
>  enum UFStatus
>  {
> @@ -23,17 +26,20 @@ enum UFStatus
>     WAITING_TO_RUN      = 1,
>     BLOCKED             = 2,
>     RUNNING             = 3,
> -    COMPLETED           = 4
> +    COMPLETED           = 4,
> +    YIELDED             = 5
>  };
>
> -typedef unsigned long long int TIME_IN_US;
> +typedef long long int TIME_IN_US;
>
>  //create the type of UF you'd like to pass into the accept handler
>  typedef unsigned long long int UFId;
>  struct UFScheduler;
> -struct UFMutex;
> -struct UF
> +
> +struct UFFact;
> +class UF
>  {
> +public:
>     friend class UFScheduler;
>     friend class UFMutex;
>
> @@ -52,34 +58,72 @@ struct UF
>     //otherwise behavior is unexpected
>     void                 yield();
>     ///must be called after the fiber is added to a scheduler
> -    void                 usleep(unsigned long long int sleepAmtInUs);
> -    static void          gusleep(unsigned long long int sleepAmtInUs);
> +    void                 usleep(TIME_IN_US sleepAmtInUs);
> +    static void          gusleep(TIME_IN_US sleepAmtInUs);
>     ///simply block the fiber
>     void                 block();
>     UFStatus             getStatus() const;
> +    unsigned long long int getLastRun() const;
>
>
>     UFStatus             _status;
>     void*                _startingArgs;
> +    static unsigned int  DEFAULT_STACK_SIZE;
> +    UFFact*              _myFactory;
> +    void reset();
>
> -protected:
> +private:
>     static UFId          _globalId;
> -    UFId                 _myId;
>     UFScheduler*         _parentScheduler;
> +    UFId                 _myId;
>     ucontext_t           _UFContext;
>     bool                 _UFObjectCreatedStack;
> +    unsigned long long int _lastRun;
>
> -private:
>     void waitOnLock();
>  };
> +inline void UF::reset()
> +{
> +    _startingArgs = 0;
> +    _myFactory = 0;
> +    _parentScheduler = 0;
> +    _lastRun = 0;
> +    _status = NOT_STARTED;
> +}
>  inline UFStatus UF::getStatus() const { return _status; }
> +inline unsigned long long int UF::getLastRun() const { return _lastRun; }
> +
> +class UFFact
> +{
> +public:
> +    virtual ~UFFact() {}
> +    virtual UF* getUF();
> +    virtual void releaseUF(UF* uf);
> +
> +protected:
> +    virtual UF* createUF() = 0;
> +    virtual void destroyUF(UF* uf) = 0;
> +};
> +inline UF* UFFact::getUF()
> +{
> +    UF* uf = createUF();
> +    if(!uf)
> +        return 0;
> +    uf->reset();
> +    uf->_myFactory = this;
> +    return uf;
> +}
> +inline void UFFact::releaseUF(UF* uf)
> +{
> +    destroyUF(uf);
> +}
>
>  struct UFFactory
>  {
>     static UFFactory* getInstance();
>     UFFactory();
>
> -    UF* selectUF(unsigned int location);;
> +    UF* selectUF(unsigned int location);
>     int registerFunc(UF* uf);
>
>  protected:
> @@ -92,9 +136,9 @@ inline UFFactory* UFFactory::getInstance
>  inline UF* UFFactory::selectUF(unsigned int location) { return _objMapping[location]; }
>
>  struct UFWaitInfo;
> -typedef std::list<UFWaitInfo*> UFWaitList;
> -typedef map<UF*, UFWaitInfo*> UFWLHash;
> -typedef std::list<UF*>          UFList;
> +typedef std::map<UF*, UFWaitInfo*>  UFWLHash;
> +typedef std::list<UF*>              UFList;
> +typedef std::deque<UF*>             UFDeque;
>  struct UFMutex
>  {
>     UFMutex()
> @@ -106,7 +150,7 @@ struct UFMutex
>
>     bool lock(UF* uf);
>     bool unlock(UF* uf);
> -    bool tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS = 0);
> +    bool tryLock(UF* uf, TIME_IN_US autoRetryIntervalInUS = 0);
>
>     //THE CALLER MUST get the lock before calling this fxn
>     //THE CALLER MUST release the lock after this fxn is called
> @@ -119,7 +163,7 @@ struct UFMutex
>     void signal();
>     //THE CALLER MUST get the lock before calling this fxn
>     //THE CALLER MUST unlock the lock after calling this fxn (to maintain parity w/ pthread calling paradigms
> -    int condTimedWait(UF *uf, unsigned long long int sleepAmtInUs);
> +    bool condTimedWait(UF *uf, TIME_IN_US sleepAmtInUs);
>
>     void releaseSpinLock(bool spinCPU = false);
>     void getSpinLock(bool spinCPU = false);
> @@ -127,7 +171,6 @@ struct UFMutex
>  protected:
>     int                 _lockActive;
>     UFList              _listOfClientsWaitingOnLock;
> -    //UFWaitList          _listOfClientsWaitingOnCond;
>     UFWLHash            _listOfClientsWaitingOnCond;
>     bool                _lockCurrentlyOwned;
>     UF*                 _mustRunUF;
> @@ -144,10 +187,10 @@ struct UFWaitInfo
>  inline void UFWaitInfo::reset() { _uf = 0; _sleeping = false; _waiting = false; }
>
>
> -typedef std::multimap<unsigned long long int, UFWaitInfo*> MapTimeUF;
> +typedef std::multimap<TIME_IN_US, UFWaitInfo*> MapTimeUF;
>  //typedef std::map<pthread_t,UFScheduler*> ThreadUFSchedulerMap;
>  //per thread scheduler
> -typedef hash_map<pthread_t, UFScheduler*, hash<uintptr_t> > ThreadUFSchedulerMap;
> +typedef std::hash_map<pthread_t, UFScheduler*, std::hash<uintptr_t> > ThreadUFSchedulerMap;
>
>  struct UFScheduler
>  {
> @@ -164,7 +207,7 @@ struct UFScheduler
>     bool addFiberToScheduler(UF* uf,      /* the UF to add */
>                              pthread_t tid = 0); /* the thread to add the UF to */
>     //add the fxn to add multiple ufs in one shot (if they're on one tid)
> -    bool addFiberToScheduler(const std::list<UF*>& ufList,
> +    bool addFiberToScheduler(const UFList& ufList,
>                               pthread_t tid = 0);
>
>
> @@ -181,10 +224,10 @@ struct UFScheduler
>     static bool                  _inThreadedMode;
>
>     UF* getRunningFiberOnThisThread();
> -    const ucontext_t& getMainContext() const;
> +    ucontext_t* getMainContext();
>     void setSpecific(void* args);
>     void* getSpecific() const;
> -    unsigned long long int getAmtToSleep() const;
> +    TIME_IN_US getAmtToSleep() const;
>     static void setExit(bool exit = true);
>     bool shouldExit() const;
>     void setExitJustMe(bool exit = true);
> @@ -203,29 +246,31 @@ struct UFScheduler
>     //to allow to identify the thread running now
>     static pthread_key_t        _specific_key;
>
> -    static void ufCreateThread(pthread_t* tid, std::list<UF*>* ufsToStartWith);
> +    static void ufCreateThread(pthread_t* tid, UFList* ufsToStartWith);
>
>     static bool                 _exit;
>     bool                        _exitJustMe;
> +    unsigned long long int      getRunCounter() const;
> +
>  protected:
> +    unsigned long long int      _runCounter;
>     UF*                         _currentFiber;
>     ucontext_t                  _mainContext;
>
>     //no lock for active running list - cause only the running
>     //thread can add to it
> -    UFList                      _activeRunningList;
> -    size_t                      _activeRunningListSize;
> +    UFDeque                     _activeRunningList;
>
>     //nominate to add to a thread's running list
> -    UFList                      _nominateToAddToActiveRunningList;
> +    UFDeque                     _nominateToAddToActiveRunningList;
>     pthread_mutex_t             _mutexToNominateToActiveList;
>     pthread_cond_t              _condToNominateToActiveList;
>
>     //the sleep tree
>     MapTimeUF                   _sleepList;
> -    unsigned long long int      _earliestWakeUpFromSleep;
> +    TIME_IN_US                  _earliestWakeUpFromSleep;
>     //store the shortest sleep interval
> -    unsigned long long int      _amtToSleep;
> +    TIME_IN_US                  _amtToSleep;
>
>
>     //store thread specific content
> @@ -234,20 +279,18 @@ protected:
>
>     void notifyUF();
>
> -    list<UFWaitInfo*>  _availableWaitInfo;
> +    std::deque<UFWaitInfo*>     _availableWaitInfo;
>     UFWaitInfo* getWaitInfo();
>     void releaseWaitInfo(UFWaitInfo& ufsi);
>     bool addFiberToSelf(UF* uf);
> -    bool addFiberToAnotherThread(const std::list<UF*>& ufList, pthread_t tid);
> -
> -public:
> -    UFMutex testingCondTimedWait;
> +    bool addFiberToAnotherThread(const UFList& ufList, pthread_t tid);
>  };
> -inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningListSize; }
> +inline unsigned long long int UFScheduler::getRunCounter() const { return _runCounter; }
> +inline size_t UFScheduler::getActiveRunningListSize() const { return _activeRunningList.size(); }
>  inline bool UFScheduler::shouldExit() const { return (_exitJustMe || _exit) ? true : false; }
> -inline unsigned long long int UFScheduler::getAmtToSleep() const { return _amtToSleep; }
> +inline TIME_IN_US UFScheduler::getAmtToSleep() const { return _amtToSleep; }
>  inline UF* UFScheduler::getRunningFiberOnThisThread(){ return _currentFiber; }
> -inline const ucontext_t& UFScheduler::getMainContext() const { return _mainContext; };
> +inline ucontext_t* UFScheduler::getMainContext() { return &_mainContext; }
>  inline void UFScheduler::setSpecific(void* args) { _specific = args; }
>  inline void* UFScheduler::getSpecific() const { return _specific; }
>  inline void UFScheduler::setExit(bool exit) { _exit = exit; }
> @@ -275,17 +318,17 @@ inline UFScheduler* UF::getParentSchedul
>
>  inline void UF::waitOnLock() { block(); }
>
> -inline void UF::gusleep(unsigned long long int sleepAmtInUs)
> +inline void UF::gusleep(TIME_IN_US sleepAmtInUs)
>  {
>     UFScheduler::getUFScheduler()->getRunningFiberOnThisThread()->usleep(sleepAmtInUs);
>  }
>
> -inline unsigned long long int timeInUS(timeval& t)
> +inline TIME_IN_US timeInUS(timeval& t)
>  {
> -    return ((unsigned long long int)(((unsigned long long int) t.tv_sec)*1000000)+(unsigned long long int) t.tv_usec);
> +    return ((TIME_IN_US)(((TIME_IN_US) t.tv_sec)*1000000)+(TIME_IN_US) t.tv_usec);
>  }
>
> -inline void UF::usleep(unsigned long long int sleepAmtInUs)
> +inline void UF::usleep(TIME_IN_US sleepAmtInUs)
>  {
>     if(!sleepAmtInUs)
>     {
> @@ -296,7 +339,7 @@ inline void UF::usleep(unsigned long lon
>     struct timeval now;
>     gettimeofday(&now, 0);
>
> -    unsigned long long int timeToWakeUp = timeInUS(now) + sleepAmtInUs;
> +    TIME_IN_US timeToWakeUp = timeInUS(now) + sleepAmtInUs;
>     if( _parentScheduler->_earliestWakeUpFromSleep > timeToWakeUp ||
>        !_parentScheduler->_earliestWakeUpFromSleep)
>         _parentScheduler->_earliestWakeUpFromSleep = timeToWakeUp;
> @@ -318,7 +361,11 @@ inline void UF::block()
>  inline void UF::yield()
>  {
>     //switch context back to the main scheduler
> -    swapcontext(&_UFContext, &(_parentScheduler->getMainContext()));
> +#if __WORDSIZE == 64
> +    uf_swapcontext(&_UFContext, _parentScheduler->getMainContext());
> +#else
> +    swapcontext(&_UFContext, _parentScheduler->getMainContext());
> +#endif
>  }
>
>  inline void UFMutex::releaseSpinLock(bool spinCPU)
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFAres.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFAres.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFAres.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFAres.H Fri Aug 13 19:43:09 2010
> @@ -9,6 +9,9 @@
>  #include <ares.h>
>  #include "UFDNS.H"
>  #include "UFHostEnt.H"
> +#include <iostream>
> +
> +using namespace std;
>
>  // for ares less then version 1.7.0
>  //#ifndef ares_addrttl
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFConf.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFConf.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFConf.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFConf.H Fri Aug 13 19:43:09 2010
> @@ -3,6 +3,10 @@
>
>  #include <string>
>  #include <vector>
> +#include <list>
> +#include <ostream>
> +
> +#include <iostream>
>  #include <ext/hash_map>
>
>  using namespace std;
> @@ -13,9 +17,9 @@ namespace __gnu_cxx
>     template<> struct hash< std::string >
>     {
>         size_t operator()( const std::string& x ) const
> -            {
> -                return hash< const char* >()( x.c_str() );
> -            }
> +        {
> +            return hash< const char* >()( x.c_str() );
> +        }
>     };
>  }
>
> @@ -25,10 +29,37 @@ namespace __gnu_cxx
>  class ConfValueBase
>  {
>  public:
> -    virtual void dump(ostream &output)=0;
> +    virtual void dump(std::ostream &output)=0;
>     virtual ~ConfValueBase() { }
> +    std::string type;
>  };
>
> +/** Helper for printing out vector
> + */
> +template <typename T>
> +std::ostream& operator <<  (std::ostream& output, const std::vector<T> &value)
> +{
> +    if(!value.size())
> +        return output;
> +
> +    for(typename std::vector<T>::const_iterator it = value.begin(); it != value.end(); it++)
> +        output << *it << " ";
> +    return output;
> +}
> +
> +/** Helper for printing out vector
> + */
> +template <typename T>
> +std::ostream& operator <<  (std::ostream& output, const std::list<T> &value)
> +{
> +    if(!value.size())
> +        return output;
> +
> +    for(typename std::list<T>::const_iterator it = value.begin(); it != value.end(); it++)
> +        output << *it << " ";
> +    return output;
> +}
> +
>  /** Template conf value
>  *  This covers all built in types.
>  *  This is used by UFConf for string, int, bool and double
> @@ -38,56 +69,154 @@ template <class T>
>  class ConfValue : public ConfValueBase {
>  public:
>     T mElement;
> -    void dump(ostream& output) { output << mElement; }
> -    friend ostream& operator <<  (ostream& output, const ConfValue<T>& value)
> +
> +    void dump(std::ostream& output) { output << mElement; }
> +    friend std::ostream& operator <<  (std::ostream& output, const ConfValue<T>& value)
>     {
>         output << value.mElement;
>         return output;
>     }
>  };
>
> +/** Get conf value from ConfValueBase
> + *  This is a helper function which makes it easier to get conf values
> + *  _testBool = &(((ConfValue<bool> *)conf_val)->mElement) will now become _testBool = confValueGet<bool>(conf_val)
> + */
> +template <class T>
> +T* confValueGet(ConfValueBase *conf_val)
> +{
> +    return &(((ConfValue<T> *)conf_val)->mElement);
> +}
> +
> +/// Forward declaration
> +class UFConfManager;
> +
>  /** Holds config data for a given file
>  *  The class has a parent conf variable.
>  *  the get* functions look at _data. If the requested key is not found, they lookup the parent
>  */
>  class UFConf
>  {
> +    friend class UFConfManager;
>  public:
> -    UFConf() : _parent(NULL) { }
> +    UFConf(const std::string &conf_file);
> +    void init();
>     ConfValueBase *get(const std::string &key);
>     int *getInt(const std::string &key);
>     double *getDouble(const std::string &key);
> -    string *getString(const std::string &key);
> +    std::string *getString(const std::string &key);
>     bool *getBool(const std::string &key);
> +    std::vector<std::string> *getStringVector(const std::string &key);
>
> -    void setInt(const std::string &key, int value);
> -    void setDouble(const std::string &key, double value);
> -    void setString(const std::string &key, const std::string &value);
> -    void setBool(const std::string &key, bool value);
> +    void setInt(const std::string &key, const std::string &key_type, int value);
> +    void setDouble(const std::string &key, const std::string &key_type, double value);
> +    void setString(const std::string &key, const std::string &key_type, const std::string &value);
> +    void setBool(const std::string &key, const std::string &key_type, bool value);
> +    void setStringVector(const std::string &key, const std::string &key_type, const std::vector<std::string> &value);
>
> -    void setParent(UFConf *parent) { _parent = parent; }
>     bool parse(const std::string &conf_file);
> -    ~UFConf();
> +    virtual bool parseLine(const std::string &line);
> +    virtual void clear();
> +    virtual void cache(const std::string &conf_key, ConfValueBase *conf_val) { }
> +    virtual void afterParse() { }
> +    virtual ~UFConf() { }
> +
> +    std::string conf_file;
>
> -    friend ostream& operator<<(ostream& output, const UFConf &conf);
> +    friend std::ostream& operator<<(std::ostream& output, const UFConf &conf);
> +
> +protected:
> +    template<typename T>
> +    void _set(const std::string &conf_key, const std::string &conf_key_type, const T &value)
> +        {
> +            // Check if a value already exists for the key
> +            ConfValueBase *existingValue = get(conf_key);
> +            if(existingValue != NULL) {
> +                delete existingValue;
> +            }
> +
> +            // Create new value. Set type. Copy value.
> +            ConfValue<T> *confValue = new ConfValue<T>;
> +            confValue->type = conf_key_type;
> +            confValue->mElement = value;
> +            _data[conf_key] = confValue;
> +
> +            // call cache so that derived classes can cache whatever value they need to
> +            cache(conf_key, confValue);
> +        }
> +
> +    template<typename T>
> +    T* _get(const std::string &key)
> +        {
> +            ConfValue<T> *confValue = (ConfValue<T> *)get(key);
> +            if(confValue != NULL) {
> +                return &confValue->mElement;
> +            }
> +            if(_parent == NULL)
> +                return NULL;
> +            return _parent->_get<T>(key);
> +        }
> +
> +    void _setParent(UFConf *parent);
>
> -private:
>     UFConf *_parent;
>     std::hash_map<std::string, ConfValueBase *> _data;
>  };
>
> +/** Factory to create conf objects
> + *  This is needed because the cache method is virtual and needs to be called from the constructor
> + */
> +template< class T >
> +T* UFConfFactory(const std::string &conf_file)
> +{
> +    T* confObject = new T(conf_file);
> +    confObject->init();
> +
> +    return confObject;
> +}
> +
>  /** Manages config objects in the system
>  *
>  */
>  class UFConfManager
>  {
>  public:
> -    static UFConf* addChildConf(const std::string &conf_file, const std::string &parent_conf_file="/home/y/conf/UF/uf.conf");
> -    static UFConf* addConf(const std::string &conf_file);
> -    static UFConf* getConf(const std::string &conf_file);
> -    static void dump();
> +    UFConfManager();
> +    void reload();
> +    UFConf* addChildConf(const std::string &conf_file, const std::string &parent_conf_file="/home/y/conf/UF/uf.conf");
> +    UFConf* addConf(const std::string &conf_file);
> +    bool addConf(UFConf *conf, const string &conf_file);
> +    bool addChildConf(UFConf *child_conf, const string &conf_file, const string &parent_conf_file="/home/y/conf/UF/uf.conf");
> +    UFConf* getConf(const std::string &conf_file);
> +    void dump();
> +
> +    static long getRefreshTime();
> +    static void setRefreshTime(long);
> +    static UFConfManager* getConfManager();
> +
> +    static pthread_key_t threadSpecificKey;
> +
>  private:
> -    static std::hash_map<std::string, UFConf *> _configs;
> +    /// force children to pickup changes from a parent
> +    void _reloadChildren(UFConf *conf);
> +
> +    /// add conf file to list of files being monitored for changes
> +    void _addWatch(const std::string &conf);
> +
> +    /// conf object associated with a conf file
> +    std::hash_map<std::string, UFConf *> _configs;
> +
> +    /// list of children for a given conf
> +    std::hash_map<std::string, vector<std::string> > _child_map;
> +
> +    /// map of fds being monitored for changes to the filename
> +    std::hash_map<int, std::string> _watch_fd_map;
> +    int _notify_fd;
> +
> +    /// Time between checking for conf file changes
> +    static long _refreshTime;
> +
> +    static pthread_key_t _createThreadKey();
>  };
>
>  #endif
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFConnectionPool.H Fri Aug 13 19:43:09 2010
> @@ -11,7 +11,7 @@
>  #include <netinet/in.h>
>  #include <arpa/inet.h>
>  #include <netdb.h>
> -#include "UF.H"
> +#include <UF.H>
>
>  class UFConnGroupInfo;
>  class UFConnIPInfo;
> @@ -23,7 +23,7 @@ struct UFConnectionPool
>
>     bool addGroup(UFConnGroupInfo* groupInfo);
>     UFConnGroupInfo* removeGroup(const std::string& groupName);
> -    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true);
> +    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true, TIME_IN_US connectTimeout = -1);
>     void releaseConnection(UFIO* ufIO, bool connOk = true);
>     void clearUnusedConnections(TIME_IN_US lastUsedTimeDiff = 300000000 /*300 secs*/, unsigned long long int coverListTime = 60*1000*1000);
>
> @@ -40,5 +40,13 @@ protected:
>     UFConnectionPoolImpl*           _impl;
>  };
>
> +struct UFConnectionPoolCleaner : public UF
> +{
> +    void run();
> +    UFConnectionPoolCleaner (bool registerMe = false);
> +    UF* createUF() { return new UFConnectionPoolCleaner(); }
> +};
> +inline UFConnectionPoolCleaner::UFConnectionPoolCleaner (bool registerMe) { /*if(registerMe) _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);*/ }
> +
>
>  #endif
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFDNS.H Fri Aug 13 19:43:09 2010
> @@ -1,8 +1,8 @@
>  #ifndef UF_DNS_H
>  #define UF_DNS_H
>
> -#include "UF.H"
> -#include "UFIO.H"
> +#include <UF.H>
> +#include <UFIO.H>
>  class UFHostEnt;
>  class UFDNS : public UFIO
>  {
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFIO.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFIO.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFIO.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFIO.H Fri Aug 13 19:43:09 2010
> @@ -4,15 +4,13 @@
>  #include <string>
>  #include <ext/hash_map>
>  #include <stdint.h>
> -#include "UF.H"
> +#include <UF.H>
>
> -using namespace std;
>  namespace std { using namespace __gnu_cxx; }
>
>
>
>
> -
>  struct UFIOAcceptThreadChooser
>  {
>     virtual std::pair<UFScheduler*,pthread_t> pickThread(int listeningFd) = 0;
> @@ -60,40 +58,66 @@ struct UFIO
>     //TODO: create ipv6 accept model
>     void accept(UFIOAcceptThreadChooser* ufiotChooser,
>                 unsigned short int ufLocation,
> +                unsigned short int port,
>                 void* startingArgs,
>                 void* stackPtr = 0,
>                 unsigned int stackSize = 0);
>
>     //The fxn will call isSetup which will make the connection request non-blocking and setup the socket
>     //TODO: support ipv6 connect
> -    bool connect(const struct sockaddr *addr, int addrlen, TIME_IN_US timeout);
> +    bool connect(const struct sockaddr *addr, socklen_t addrlen, TIME_IN_US timeout);
>
> -    //since this is most likely going to be run on an edge trigger system
> -    //reads should be done in such a manner so that if the read call
> -    //returns back w/ the same number specified, then a following read call
> -    //should be made to ensure that nothing is left in the network buffer
> -    //since in edge trigger no more events will be generated for data that was
> -    //already seen earlier as being in the network. Therefore always make
> -    //another call to this fxn if the return was == to nbyte
> -    ssize_t read(void *buf, size_t nbyte, TIME_IN_US timeout = 0);
> -    ssize_t write(const void *buf, size_t nbyte, TIME_IN_US timeout = 0);
> +    /**
> +     * @brief read until delimiter is encountered or n-1 bytes has been read
> +     * @param buf the output buffer. The buffer is null terminated if readLine was successful.
> +     * @param n the size of the buffer pointed to by buf in bytes
> +     * @return number of bytes read, not including the delimiter on success. -1 on error,
> +     *         getErrno() will return the underlying error.
> +     */
> +    ssize_t readLine(char* buf, size_t n, char delim='\n');
> +    /**
> +     * @brief read until delimiter is encountered or n-1 bytes has been read
> +     * @param out the output string
> +     * @param n readLine will attempt to read at most n-1 bytes
> +     * @return number of bytes read, not including the delimiter on success or -1 on error.
> +     *         getErrno() will return the underlying error.
> +     */
> +    ssize_t readLine(std::string &out, size_t n, char delim='\n');
> +
> +    /**
> +     * @brief attempt to read nbytes from the underlying fd
> +     * @param buf the output buffer
> +     * @param timeout the timeout in microseconds. 0 indicates a non-blocking call, -1 is no timeout
> +     * @returns number of bytes read, or -1 on error. getErrno() will return the underlying error
> +     *
> +     * since this is most likely going to be run on an edge trigger system
> +     * reads should be done in such a manner so that if the read call
> +     * returns back nbyte, then a following read call
> +     * should be made to ensure that nothing is left in the network buffer
> +     * since in edge trigger no more events will be generated for data that was
> +     * already seen earlier as being in the network. Therefore always make
> +     * another call to this fxn if the return was == to nbyte
> +     */
> +    ssize_t read(void *buf, size_t nbyte, TIME_IN_US timeout = -1);
> +    ssize_t write(const void *buf, size_t nbyte, TIME_IN_US timeout = -1);
> +    ssize_t writev(const struct iovec *iov, int iov_size, TIME_IN_US timeout = -1);
>     int sendto(const char *msg,
> -                  int len,
> -                     const struct sockaddr *to,
> -                  int tolen,
> -                  TIME_IN_US timeout);
> +               size_t len,
> +               const struct sockaddr *to,
> +               socklen_t tolen,
> +               TIME_IN_US timeout);
>     int sendmsg(const struct msghdr *msg,
> -                   int flags,
> -                      TIME_IN_US timeout);
> +                int flags,
> +                TIME_IN_US timeout);
>     int recvfrom(char *buf,
> -                    int len,
> -                    struct sockaddr *from,
> -                           int *fromlen,
> -                    TIME_IN_US timeout);
> +                 size_t len,
> +                 struct sockaddr *from,
> +                 socklen_t *fromlen,
> +                 TIME_IN_US timeout);
>     int recvmsg(struct msghdr *msg,
> -                   int flags,
> -                      TIME_IN_US timeout);
> -
> +                int flags,
> +                TIME_IN_US timeout);
> +
>     bool close();
>
>     bool setFd(int fd, bool makeNonBlocking = true);
> @@ -107,17 +131,23 @@ struct UFIO
>     unsigned int getRemotePort() const;
>     UFIOScheduler* getUFIOScheduler() const;
>
> -    static void ufCreateThreadWithIO(pthread_t* tid, std::list<UF*>* ufsToStartWith);
> +    static void ufCreateThreadWithIO(pthread_t* tid, UFList* ufsToStartWith);
>
>     UFSleepInfo*                _sleepInfo;
>     bool                        _markedActive;
>     bool                        _active;
>
> +    static int                  RECV_SOCK_BUF;
> +    static int                  SEND_SOCK_BUF;
> +
>  protected:
>     int                         _fd;
>     unsigned int                _errno;
>     UF*                         _uf;
>     UFIOScheduler*              _ufios;
> +    char*                       _readLineBuf;
> +    size_t                      _readLineBufPos;
> +    size_t                      _readLineBufSize;
>
>     UFIO() { reset(); }
>     void reset();
> @@ -139,8 +169,8 @@ inline unsigned int UFIO::getRemotePort(
>
>
>  struct UFIOScheduler;
> -//typedef map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
> -typedef hash_map<pthread_t, UFIOScheduler*, hash<uintptr_t> > ThreadFiberIOSchedulerMap;
> +//typedef std::map<pthread_t, UFIOScheduler*> ThreadFiberIOSchedulerMap;
> +typedef std::hash_map<pthread_t, UFIOScheduler*, std::hash<uintptr_t> > ThreadFiberIOSchedulerMap;
>
>  struct UFConnectionPool;
>  struct UFIOScheduler
> @@ -148,13 +178,13 @@ struct UFIOScheduler
>     UFIOScheduler();
>     virtual ~UFIOScheduler();
>
> -    virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0) = 0;
> -    virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0) = 0;
> -    virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = 0) = 0;
> -    virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0) = 0;
> +    virtual bool setupForConnect(UFIO* ufio, TIME_IN_US to = -1) = 0;
> +    virtual bool setupForAccept(UFIO* ufio, TIME_IN_US to = -1) = 0;
> +    virtual bool setupForRead(UFIO* ufio, TIME_IN_US to = -1) = 0;
> +    virtual bool setupForWrite(UFIO* ufio, TIME_IN_US to = -1) = 0;
>     virtual bool closeConnection(UFIO* ufio) = 0;
>     //TODO: support regular poll behavior
> -    virtual bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0) = 0;
> +    virtual bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = -1) = 0;
>
>     virtual bool isSetup() { return false; }
>     virtual void waitForEvents(TIME_IN_US timeToWait) = 0;
> @@ -174,7 +204,7 @@ inline UFConnectionPool* UFIOScheduler::
>
>  #define MAX_FDS_FOR_EPOLL 128*1024-1
>  //typedef map<int, UFIO*> IntUFIOMap;
> -typedef hash_map<int, UFIO*, hash<int> > IntUFIOMap;
> +typedef std::hash_map<int, UFIO*, std::hash<int> > IntUFIOMap;
>  typedef std::multimap<TIME_IN_US, UFSleepInfo*> MapTimeUFIO;
>  struct EpollUFIOScheduler : public UFIOScheduler
>  {
> @@ -185,15 +215,15 @@ struct EpollUFIOScheduler : public UFIOS
>     bool isSetup(); //call after the c'tor to verify that the structure is correctly setup
>
>     //inputInfo is the flags info (its an int*) for this addition
> -    bool setupForConnect(UFIO* ufio, TIME_IN_US to = 0);
> -    bool setupForAccept(UFIO* ufio, TIME_IN_US to = 0);
> -    bool setupForRead(UFIO* ufio, TIME_IN_US to = 0);
> -    bool setupForWrite(UFIO* ufio, TIME_IN_US to = 0);
> +    bool setupForConnect(UFIO* ufio, TIME_IN_US to = -1);
> +    bool setupForAccept(UFIO* ufio, TIME_IN_US to = -1);
> +    bool setupForRead(UFIO* ufio, TIME_IN_US to = -1);
> +    bool setupForWrite(UFIO* ufio, TIME_IN_US to = -1);
>     bool closeConnection(UFIO* ufio);
> -    bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = 0);
> +    bool rpoll(std::list<UFIO*>& ufioList, TIME_IN_US to = -1);
>
>
> -    void waitForEvents(TIME_IN_US timeToWait = -1);
> +    void waitForEvents(TIME_IN_US timeToWait = 0);
>
>     bool                            _interruptedByEventFd;
>
> @@ -208,16 +238,16 @@ protected:
>
>
>     MapTimeUFIO                     _sleepList;
> -    unsigned long long int          _earliestWakeUpFromSleep;
> +    TIME_IN_US          _earliestWakeUpFromSleep;
>
>     bool addToScheduler(UFIO* ufio,
>                         void* inputInfo /*flags to identify how ot add*/,
> -                        TIME_IN_US to = 0,
> +                        TIME_IN_US to = -1,
>                         bool wait = true,
>                         bool runEpollCtl = false);
>
>
> -    list<UFSleepInfo*>  _availableSleepInfo;
> +    std::deque<UFSleepInfo*>        _availableSleepInfo;
>     UFSleepInfo* getSleepInfo();
>     void releaseSleepInfo(UFSleepInfo& ufsi);
>  };
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFPC.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFPC.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFPC.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFPC.H Fri Aug 13 19:43:09 2010
> @@ -3,67 +3,200 @@
>
>  #include <set>
>  #include <stack>
> -#include "UF.H"
> +#include <deque>
> +#include <UF.H>
> +//#include <Factory.H>
>
>  struct UFMutex;
>  struct UFProducerData;
>  struct UFConsumer;
>  struct UFProducer;
>
> +
>  struct UFConsumer
>  {
>     friend class UFProducer;
> -    UFConsumer(bool notifyOnExitOnly = false);
> -    virtual ~UFConsumer();
> +    friend class UFJoinableProducer;
> +    friend class UFNonJoinableProducer;
> +
>     //user needs to call input->releaseMe after consuming the data
> -    UFProducerData* waitForData(UF* uf = 0);
> +    UFProducerData* waitForData(UF* uf = 0, size_t* numRemaining = 0, TIME_IN_US timeToWait=0);
>     bool hasData(UF* uf = 0);
>
> -    bool joinProducer(UFProducer* ufp);
> -    bool removeProducer(UFProducer* ufp);
> -    void reset();
> -
> -    std::list<UFProducerData*>      _queueOfDataToConsume;
> -    UFMutex                         _queueOfDataToConsumeLock;
>     bool getNotifyOnExitOnly() const;
> -
>     UF* getUF() const;
> +
>     bool                            _requireLockToWaitForUpdate; //if the developer is aware that both the producer and all the consumers are going to run in the same thread - only then set this variable to false to gain some perf. benefits
> -protected:
> -    std::set<UFProducer*>           _consumersProducerSet;
> +
> +    UFConsumer();
> +    virtual ~UFConsumer() { reset(); }
> +    void reset();
> +
> +    virtual std::string getMyType() = 0;
> +
>     UFMutex                         _consumersProducerSetLock;
>     bool                            _notifyOnExitOnly;
> +
> +
> +protected:
>     UF*                             _currUF;
> +    std::string                     _myType;
> +    std::deque<UFProducerData*>     _queueOfDataToConsume;
> +    UFMutex                         _queueOfDataToConsumeLock;
> +
> +    void clearDataToConsume();
>  };
>  inline UF* UFConsumer::getUF() const { return _currUF; }
>  inline bool UFConsumer::getNotifyOnExitOnly() const { return _notifyOnExitOnly; }
>
> +
> +struct UFJoinableProducer;
> +struct UFJoinableConsumer : public UFConsumer
> +{
> +    friend class UFJoinableProducer;
> +    friend class UFProducer;
> +    UFJoinableConsumer(bool notifyOnExitOnly = false);
> +    ~UFJoinableConsumer() { resetMe(); };
> +    bool joinProducer(UFJoinableProducer* ufp);
> +    bool removeProducer(UFJoinableProducer* ufp);
> +
> +    virtual std::string getMyType() { return "UFJoinableConsumer"; }
> +
> +protected:
> +    std::deque<UFJoinableProducer*>         _consumersProducerSet;
> +    void resetMe();
> +};
> +
> +
> +struct UFNonJoinableProducer;
> +struct UFNonJoinableConsumer : public UFConsumer
> +{
> +    friend class UFNonJoinableProducer;
> +    friend class UFProducer;
> +    UFNonJoinableConsumer(UFNonJoinableProducer* ufp, bool notifyOnExitOnly = false);
> +    ~UFNonJoinableConsumer() { resetMe(); }
> +    void resetMe();
> +
> +    virtual std::string getMyType() { return "UFNonJoinableConsumer"; }
> +
> +protected:
> +    bool removeProducer();
> +    UFNonJoinableProducer*         _ufp;
> +};
> +
> +
> +
> +
> +
> +
> +struct UFDataObject;
>  struct UFProducer
>  {
> +    friend class UFProducerConsumerPair;
>     friend class UFConsumer;
>     UFProducer();
> -    virtual ~UFProducer();
> -    bool removeConsumer(UFConsumer* ufc);
> -    bool produceData(void* data, unsigned int size, int ufpcCode, bool freeDataOnExit = true, UF* uf = 0);
> +    virtual ~UFProducer() {}
> +    size_t produceData(UFDataObject* data, int ufpcCode, bool freeDataOnExit = true, UF* uf = 0);
> +    size_t produceData(UFProducerData* ufpd, UF* uf = 0);
> +    virtual size_t getConsumerCount() = 0;
>     void reset();
>     void init();
> +    bool                            _sendEOFAtEnd;
>     bool                            _requireLockToUpdateConsumers;//if the developer is aware that both the producer and the consumers are going to run in the same thread - only then set this variable to false to gain some perf. benefits
>
>  protected:
> -    std::set<UFConsumer*>           _producersConsumerSet;
> -    size_t                          _producersConsumerSetSize;
>     UFConsumer*                     _mostRecentConsumerAdded;
>     UFMutex                         _producersConsumerSetLock; //needed when the consumers are adding or removing themselves from the consumerList
>     bool                            _acceptNewConsumers;
>
> -    bool addConsumer(UFConsumer* ufc);
> +    virtual bool addConsumer(UFConsumer* ufc) = 0;
> +    virtual bool removeConsumer(UFConsumer* ufc) = 0;
> +    virtual size_t updateConsumers(UFProducerData* ufpd, UF* uf) = 0;
> +    virtual void removeAllConsumers() = 0;
> +
>     UF*                             _uf;
>  };
> +inline UFProducer::UFProducer() { init(); }
> +inline void UFProducer::init()
> +{
> +    _acceptNewConsumers = true;
> +    _requireLockToUpdateConsumers = true;
> +    _mostRecentConsumerAdded = 0;
> +    _sendEOFAtEnd = true;
> +}
> +
> +struct UFJoinableProducer : public UFProducer
> +{
> +    UFJoinableProducer() {}
> +    ~UFJoinableProducer() { reset(); };
> +    bool addConsumer(UFConsumer* ufc);
> +    bool removeConsumer(UFConsumer* ufc);
> +    size_t getConsumerCount();
> +
> +protected:
> +    std::deque<UFConsumer*>         _producersConsumerSet;
> +    size_t updateConsumers(UFProducerData* ufpd, UF* uf);
> +    void removeAllConsumers();
> +};
> +inline size_t UFJoinableProducer::getConsumerCount()
> +{
> +    if(!_requireLockToUpdateConsumers)
> +        return _producersConsumerSet.size();
> +
> +    size_t count = 0;
> +    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
> +    _producersConsumerSetLock.lock(uf);
> +    count = _producersConsumerSet.size();
> +    _producersConsumerSetLock.unlock(uf);
> +
> +    return count;
> +}
> +
> +struct UFNonJoinableProducer : public UFProducer
> +{
> +    friend class UFNonJoinableConsumer;
> +    UFNonJoinableProducer(UFNonJoinableConsumer* ufc) { if(ufc) _mostRecentConsumerAdded = ufc; }
> +    ~UFNonJoinableProducer() { reset(); }
> +    size_t getConsumerCount();
> +
> +protected:
> +    UFConsumer*     _ufc;
> +    UFNonJoinableProducer() {}
> +    size_t updateConsumers(UFProducerData* ufpd, UF* uf);
> +    void removeAllConsumers();
> +    bool addConsumer(UFConsumer* ufc);
> +    bool removeConsumer(UFConsumer* ufc);
> +};
> +inline size_t UFNonJoinableProducer::getConsumerCount()
> +{
> +    if(!_requireLockToUpdateConsumers)
> +        return _mostRecentConsumerAdded ? 1 : 0;
> +
> +    size_t count = 0;
> +    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
> +    _producersConsumerSetLock.lock(uf);
> +    count = _mostRecentConsumerAdded ? 1 : 0;
> +    _producersConsumerSetLock.unlock(uf);
> +
> +    return count;
> +}
> +
> +
> +
> +
> +
> +
> +struct UFDataObject
> +{
> +    UFDataObject();
> +    virtual ~UFDataObject();
> +};
> +inline UFDataObject::UFDataObject() {}
> +inline UFDataObject::~UFDataObject() {}
>
>  struct UFProducerData
>  {
> -    void*                           _data;
> -    unsigned int                    _size;
> +    UFDataObject*                   _data;
>     UFProducer*                     _producerWhichInserted;
>     int                             _ufpcCode;
>     bool                            _freeDataOnExit;
> @@ -76,10 +209,10 @@ struct UFProducerData
>     static UFProducerData* getObj();
>     static void releaseObj(UFProducerData* obj);
>
> -
> -protected:
>     ~UFProducerData();
>     UFProducerData() { reset(); }
> +
> +protected:
>     UFMutex                         _controlReferenceCount; //control the ref. count of this data
>     size_t                          _referenceCount;
>     static std::stack<UFProducerData*> _objList;
> @@ -92,67 +225,34 @@ inline void UFProducerData::reset()
>     _lockToUpdate = true;
>  }
>
> -inline bool UFProducer::addConsumer(UFConsumer* ufc)
> +inline void UFProducerData::addRef(size_t numToAdd)
>  {
> -    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
> -    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
> -    if(!_acceptNewConsumers)
> +    if(!_lockToUpdate)
>     {
> -        if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
> -        return false;
> +        _referenceCount += numToAdd;
> +        return;
>     }
> -    _producersConsumerSet.insert(ufc); //check insertion
> -    _producersConsumerSetSize++;
> -    _mostRecentConsumerAdded = ufc;
> -    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
> -    return true;
> +
> +    _controlReferenceCount.getSpinLock();
> +    _referenceCount += numToAdd;
> +    _controlReferenceCount.releaseSpinLock();
>  }
>
> -inline bool UFProducer::removeConsumer(UFConsumer* ufc)
> +inline void UFProducerData::reduceRef()
>  {
> -    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
> -    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
> -    _producersConsumerSet.erase(ufc);
> -    _producersConsumerSetSize--;
> -    if(_requireLockToUpdateConsumers)
> -        _producersConsumerSetLock.signal();
> -    else
> +    if(!_lockToUpdate)
>     {
> -        if(_uf)
> -            UFScheduler::getUFScheduler()->addFiberToScheduler(_uf);
> +        --_referenceCount;
> +        return;
>     }
> -    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
> -    return true;
> -}
> -
> -inline void UFProducerData::addRef(size_t numToAdd)
> -{
> -    if(_lockToUpdate) _controlReferenceCount.getSpinLock();
> -    _referenceCount = numToAdd;
> -    if(_lockToUpdate) _controlReferenceCount.releaseSpinLock();
> -}
>
> -inline void UFProducerData::reduceRef()
> -{
> -    if(_lockToUpdate) _controlReferenceCount.getSpinLock();
> +    _controlReferenceCount.getSpinLock();
>     --_referenceCount;
> -    if(_lockToUpdate) _controlReferenceCount.releaseSpinLock();
> +    _controlReferenceCount.releaseSpinLock();
>  }
>
>  inline UFProducerData* UFProducerData::getObj()
>  {
> -    /*
> -    _objListMutex.getSpinLock();
> -    if(!_objList.empty())
> -    {
> -        UFProducerData* retVal = _objList.top();
> -        _objList.pop();
> -        _objListMutex.releaseSpinLock();
> -        retVal->reset();
> -        return retVal;
> -    }
> -    _objListMutex.releaseSpinLock();
> -    */
>     return new UFProducerData();
>  }
>
> @@ -164,11 +264,6 @@ inline void UFProducerData::releaseObj(U
>     if(obj->_lockToUpdate) obj->_controlReferenceCount.getSpinLock();
>     if(!--obj->_referenceCount)
>     {
> -        /*
> -        _objListMutex.getSpinLock();
> -        _objList.push(obj);
> -        _objListMutex.releaseSpinLock();
> -        */
>         delete obj;
>         return;
>     }
> @@ -178,19 +273,40 @@ inline void UFProducerData::releaseObj(U
>  inline UFProducerData::~UFProducerData()
>  {
>     if(_freeDataOnExit && _data)
> -        free (_data);
> +        delete (_data);
>  }
>
> -inline UFConsumer::~UFConsumer() { reset(); }
>
> -inline UFProducer::~UFProducer() { reset(); }
> -inline void UFProducer::init()
> -{
> -    _acceptNewConsumers = true;
> -    _requireLockToUpdateConsumers = true;
> -    _producersConsumerSetSize = 0;
> -    _mostRecentConsumerAdded = 0;
> +
> +struct UFProducerConsumerPair
> +{
> +    UFProducerConsumerPair();
> +    ~UFProducerConsumerPair();
> +    UFNonJoinableConsumer* getConsumer() const { return _c; }
> +    UFNonJoinableProducer* getProducer() const { return _p; }
> +
> +protected:
> +    UFNonJoinableConsumer*      _c;
> +    UFNonJoinableProducer*      _p;
> +};
> +inline UFProducerConsumerPair::UFProducerConsumerPair()
> +{
> +    _p = new UFNonJoinableProducer(0);
> +    _c = new UFNonJoinableConsumer(_p);
> +}
> +inline UFProducerConsumerPair::~UFProducerConsumerPair()
> +{
> +    if(_p)
> +    {
> +        _p->_requireLockToUpdateConsumers = false; //dont waste time on locking
> +        _p->_uf = 0; //the producer shouldnt be associated w/ any uf anymore
> +    }
> +    if(_c)
> +    {
> +        _c->_requireLockToWaitForUpdate = false; //dont waste time on locking
> +        delete _c;
> +    }
> +    if(_p) delete _p; //the producer is killed after the consumer, so that the producer doesnt have to wait for the consumer to die and simply keep the uf alive
>  }
> -inline UFProducer::UFProducer() { init(); }
>
>  #endif
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFServer.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFServer.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFServer.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFServer.H Fri Aug 13 19:43:09 2010
> @@ -4,9 +4,10 @@
>  #include <pthread.h>
>  #include <map>
>  #include <vector>
> +#include <list>
>
> -#include "UF.H"
> -#include "UFIO.H"
> +#include <UF.H>
> +#include <UFIO.H>
>
>  typedef std::map<std::string, std::vector<pthread_t>* > StringThreadMapping;
>  struct UFServerThreadChooser;
> @@ -26,10 +27,16 @@ public:
>     unsigned int getProcessCount() const { return _childProcesses.size(); }
>     unsigned int UF_STACK_SIZE;
>     const char* getBindingInterface() const { return _addressToBindTo.c_str() ; }
> -    unsigned int getPort() const { return _port ; }
> -    unsigned int getListenFd() const { return _listenFd; }
> -    unsigned int setListenFd(int fd) { return (_listenFd = fd); }
>
> +    struct ListenSocket
> +    {
> +        unsigned short int port;
> +        int fd;
> +        ListenSocket(unsigned short int p = 0, int f = -1) : port(p), fd(f) { };
> +    };
> +    typedef std::list<ListenSocket> ListenSocketList;
> +
> +    const ListenSocketList &getListenSockets() const { return _listenSockets; }
>
>     //functions that are allowed to be modified by the
>     //inherited class to provide customizable functionalities
> @@ -38,7 +45,9 @@ public:
>     virtual void postBetweenFork(int childPid) {}
>     virtual void postForkPreRun() {}
>     virtual void preThreadRun() {}
> +    virtual void postThreadRun() {}
>     virtual void preThreadCreation() {}
> +    virtual void postThreadCreation() {}
>     virtual void preAccept() {}
>
>
> @@ -50,15 +59,14 @@ public:
>
>     StringThreadMapping* getThreadList() { return &_threadList; };
>     std::vector<pthread_t>* getThreadType(const std::string& type);
> -    void addThread(const std::string& type, UFScheduler* ufScheduler);
> +    void addThread(const std::string& type, UFScheduler* ufScheduler, pthread_t tid=0);
>     void run();
>
>  protected:
>     void reset();
>
>     std::string                     _addressToBindTo;
> -    unsigned int                    _port;
> -    int                             _listenFd;
> +    ListenSocketList                _listenSockets;
>     unsigned int                    _listenBackLog;
>
>     time_t                          _creationTime;
> @@ -72,30 +80,39 @@ protected:
>
>     void startThreads();
>
> +    void _addListenPort(unsigned short int port)
> +    {
> +        _listenSockets.push_back(ListenSocket(port));
> +    }
> +
>  private:
>     //start processing
>  };
>
> +
> +
>  struct UFServerThreadChooser : public UFIOAcceptThreadChooser
>  {
>     UFServerThreadChooser() { }
>
>     std::pair<UFScheduler*, pthread_t> pickThread(int listeningFd);
> -    void add(UFScheduler* ufs, pthread_t tid)
> -    {
> -        _threadList.push_back(make_pair(ufs, tid));
> -    }
> +    void add(UFScheduler* ufs, pthread_t tid);
>
>  protected:
> -    std::vector<pair<UFScheduler*, pthread_t> > _threadList;
> +    std::vector<std::pair<UFScheduler*, pthread_t> > _threadList;
>  };
>
> +inline void UFServerThreadChooser::add(UFScheduler* ufs, pthread_t tid)
> +{
> +    _threadList.push_back(std::make_pair(ufs, tid));
> +}
> +
>  inline std::pair<UFScheduler*, pthread_t> UFServerThreadChooser::pickThread(int listeningFd)
>  {
>     static unsigned int lastLocUsed = 0;
>     if(!_threadList.size())
>     {
> -        cerr<<"there has to be some fabric to hand the request to"<<endl;
> +        std::cerr<<"there has to be some fabric to hand the request to"<<std::endl;
>         exit(1);
>     }
>
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFStatSystem.H Fri Aug 13 19:43:09 2010
> @@ -11,7 +11,7 @@
>  #include <vector>
>  #include <utility>
>  #include <sstream>
> -#include "UF.H"
> +#include <UF.H>
>
>  struct Stat
>  {
>
> Modified: trafficserver/traffic/branches/UserFiber/core/include/UFStats.H
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/include/UFStats.H?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/include/UFStats.H (original)
> +++ trafficserver/traffic/branches/UserFiber/core/include/UFStats.H Fri Aug 13 19:43:09 2010
> @@ -6,6 +6,7 @@
>  namespace UFStats
>  {
>     void registerStats(bool lock_needed = false);
> +    extern uint32_t currentConnections;
>     extern uint32_t connectionsHandled;
>     extern uint32_t txnSuccess;
>     extern uint32_t txnFail;
>
> Propchange: trafficserver/traffic/branches/UserFiber/core/include/UFSwapContext.H
> ------------------------------------------------------------------------------
>    svn:mergeinfo =
>
> Modified: trafficserver/traffic/branches/UserFiber/core/src/Makefile
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/Makefile?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/src/Makefile (original)
> +++ trafficserver/traffic/branches/UserFiber/core/src/Makefile Fri Aug 13 19:43:09 2010
> @@ -33,20 +33,20 @@ $(ARES_SRC):
>           tar xzf ./$(ARES_SRC_FILE); \
>        fi
>
> -$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H UFSwapContext.H
> +$(LIB_DIR)/UF.o: UF.C $(INCLUDE_DIR)/UF.H
>        $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UF.o UF.C
>
>  $(LIB_DIR)/UFPC.o: UFPC.C $(INCLUDE_DIR)/UFPC.H $(LIB_DIR)/UF.o
>        $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFPC.o UFPC.C
>
> -$(LIB_DIR)/UFConnectionPoolImpl.o: $(LIB_DIR)/UFAres.o UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
> +$(LIB_DIR)/UFConnectionPoolImpl.o: UFConnectionPoolImpl.C $(INCLUDE_DIR)/UFConnectionPool.H UFConnectionPoolImpl.H $(LIB_DIR)/UF.o $(LIB_DIR)/UFIO.o
>        $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFConnectionPoolImpl.o UFConnectionPoolImpl.C
>
>  $(LIB_DIR)/UFIO.o: UFIO.C $(INCLUDE_DIR)/UFIO.H $(LIB_DIR)/UF.o
>        $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFIO.o UFIO.C
>
> -$(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_DIR)/UFAres.H $(INCLUDE_DIR)/UFDNS.H $(INCLUDE_DIR)/UFHostEnt.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o $(ARES_SRC)
> -       $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFAres.o UFAres.C
> +#$(LIB_DIR)/UFAres.o: UFAres.C $(INCLUDE_DIR)/UFAres.H $(INCLUDE_DIR)/UFDNS.H $(INCLUDE_DIR)/UFHostEnt.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o $(ARES_SRC)
> +#      $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFAres.o UFAres.C
>
>  $(LIB_DIR)/UFStatSystem.o: UFStatSystem.C $(INCLUDE_DIR)/UFStatSystem.H $(LIB_DIR)/UFIO.o $(LIB_DIR)/UF.o
>        $(CXX) -c $(CPPFLAGS) $(CXXFLAGS) -o $(LIB_DIR)/UFStatSystem.o UFStatSystem.C
> @@ -63,8 +63,9 @@ $(LIB_DIR)/UFServer.o: UFServer.C $(INCL
>  $(LIB_DIR)/UFSwapContext.o: UFSwapContext.S
>        $(CC) -c -o $@ $^
>
> -$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
> -       $(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $(ARESDIR)/$(ARES)/.libs/*.o $^
> +#$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o  $(LIB_DIR)/UFAres.o $(ARES_LIB)
> +$(LIB_DIR)/libUF.a: $(LIB_DIR)/UF.o $(LIB_DIR)/UFPC.o $(LIB_DIR)/UFIO.o $(LIB_DIR)/UFStatSystem.o $(LIB_DIR)/UFStats.o $(LIB_DIR)/UFConf.o $(LIB_DIR)/UFServer.o $(LIB_DIR)/UFSwapContext.o $(LIB_DIR)/UFConnectionPoolImpl.o
> +       $(AR) $(ARFLAGS) $(LIB_DIR)/libUF.a $^
>        $(RANLIB) $(LIB_DIR)/libUF.a
>
>  clean:
>
> Modified: trafficserver/traffic/branches/UserFiber/core/src/UF.C
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UF.C?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/src/UF.C (original)
> +++ trafficserver/traffic/branches/UserFiber/core/src/UF.C Fri Aug 13 19:43:09 2010
> @@ -1,5 +1,4 @@
> -#include "UF.H"
> -#include "UFConnectionPool.H"
> +#include <UF.H>
>
>  #include <string.h>
>  #include <iostream>
> @@ -10,7 +9,7 @@
>  #include <stdio.h>
>  #include <malloc.h>
>  #include <sys/mman.h>
> -#include "UFSwapContext.H"
> +#include <UFSwapContext.H>
>
>  using namespace std;
>
> @@ -33,12 +32,13 @@ static void runFiber(void* args)
>
>  ///////////////UF/////////////////////
>  UFFactory* UFFactory::_instance = 0;
> -const unsigned int DEFAULT_STACK_SIZE = 4*4096;
> +unsigned int UF::DEFAULT_STACK_SIZE = 4*4096;
>  UFId UF::_globalId = 0;
>
>  UF::UF()
>  {
> -    _startingArgs = 0;
> +    reset();
> +    _myId = ++_globalId;  //TODO: make atomic
>     setup();
>  }
>
> @@ -59,17 +59,14 @@ bool UF::setup(void* stackPtr, size_t st
>     }
>  #endif
>
> -    _myId = ++_globalId;  //TODO: make atomic
> -    _status = NOT_STARTED;
> -
>     if(!stackPtr || !stackSize)
>     {
>  #ifndef DEBUG
> -        _UFContext.uc_stack.ss_size = (stackSize) ? stackSize : DEFAULT_STACK_SIZE;
> +        _UFContext.uc_stack.ss_size = (stackSize) ? stackSize : UF::DEFAULT_STACK_SIZE;
>         _UFContext.uc_stack.ss_sp = (void*) malloc (_UFContext.uc_stack.ss_size);
>  #else
> -        _UFContext.uc_stack.ss_size = DEFAULT_STACK_SIZE;
> -        _UFContext.uc_stack.ss_sp = (void*) memalign (pageSize, DEFAULT_STACK_SIZE);
> +        _UFContext.uc_stack.ss_size = UF::DEFAULT_STACK_SIZE;
> +        _UFContext.uc_stack.ss_sp = (void*) memalign (pageSize, UF::DEFAULT_STACK_SIZE);
>         if(!_UFContext.uc_stack.ss_sp)
>         {
>             cerr<<"couldnt allocate space from memalign "<<strerror(errno)<<endl;
> @@ -91,8 +88,6 @@ bool UF::setup(void* stackPtr, size_t st
>     }
>     _UFContext.uc_stack.ss_flags = 0;
>
> -    _parentScheduler = 0;
> -
>     return true;
>  }
>
> @@ -116,7 +111,6 @@ static pthread_key_t getThreadKey()
>  pthread_key_t UFScheduler::_specific_key = getThreadKey();
>  UFScheduler::UFScheduler()
>  {
> -    _activeRunningListSize = 0;
>     _earliestWakeUpFromSleep = 0;
>     _exitJustMe = false;
>     _specific = 0;
> @@ -161,25 +155,39 @@ UFScheduler::UFScheduler()
>
>     pthread_setspecific(_specific_key, this);
>     _amtToSleep = 0;
> +    _runCounter = 1;
>  }
>
> -UFScheduler::~UFScheduler() { /*pthread_key_delete(_specific_key);*/ }
> +UFScheduler::~UFScheduler()
> +{
> +    //remove the UFScheduler associated w/ this thread
> +    pthread_mutex_lock(&_mutexToCheckFiberSchedulerMap);
> +    ThreadUFSchedulerMap::iterator index = _threadUFSchedulerMap.find(pthread_self());
> +    if(index != _threadUFSchedulerMap.end())
> +        _threadUFSchedulerMap.erase(index);
> +    pthread_mutex_unlock(&_mutexToCheckFiberSchedulerMap);
> +
> +    /*pthread_key_delete(_specific_key);*/
> +}
>
>  bool UFScheduler::addFiberToSelf(UF* uf)
>  {
> -    if(uf->_status == WAITING_TO_RUN) //UF is already in the queue
> +    if(!uf)
> +        return false;
> +    if(uf->_status == WAITING_TO_RUN ||
> +       uf->_status == YIELDED) //UF is already in the queue
>         return true;
>     uf->_status = WAITING_TO_RUN;
> -    if(uf->_parentScheduler) //probably putting back an existing uf into the active list
> +    if(uf->getParentScheduler()) //probably putting back an existing uf into the active list
>     {
> -        if(uf->_parentScheduler == this) //check that we're scheduling for the same thread
> +        if(uf->getParentScheduler() == this) //check that we're scheduling for the same thread
>         {
> -            _activeRunningList.push_back(uf); ++_activeRunningListSize;
> +            _activeRunningList.push_front(uf);
>             return true;
>         }
>         else
>         {
> -            cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->_parentScheduler<<endl;
> +            cerr<<uf<<" uf is not part of scheduler, "<<this<<" its part of "<<uf->getParentScheduler()<<endl;
>             abort(); //TODO: remove the abort
>             return false;
>         }
> @@ -202,12 +210,15 @@ bool UFScheduler::addFiberToSelf(UF* uf)
>         cerr<<"error while trying to run makecontext"<<endl;
>         return false;
>     }
> -    _activeRunningList.push_back(uf); ++_activeRunningListSize;
> +    _activeRunningList.push_front(uf);
>     return true;
>  }
>
>  bool UFScheduler::addFiberToAnotherThread(const list<UF*>& ufList, pthread_t tid)
>  {
> +    if(ufList.empty())
> +        return false;
> +
>     //find the other thread --
>     //TODO: have to lock before looking at this map -
>     //since it could be changed if more threads are added later - not possible in the test that is being run (since the threads are created before hand)
> @@ -260,15 +271,13 @@ bool UFScheduler::addFiberToScheduler(co
>         return true;
>
>     //adding to the same scheduler and as a result thread as the current job
> -    UF* uf = 0;
> -    list<UF*>::const_iterator beg = ufList.begin();
> -    list<UF*>::const_iterator ending = ufList.end();
>     if(!tid || (tid == pthread_self()))
>     {
> +        list<UF*>::const_iterator beg = ufList.begin();
> +        list<UF*>::const_iterator ending = ufList.end();
>         for(; beg != ending; ++beg)
>         {
> -            uf = *beg;
> -            if(addFiberToSelf(uf))
> +            if(addFiberToSelf(*beg))
>                 continue;
>             else
>                 return false;
> @@ -295,23 +304,31 @@ void UFScheduler::runScheduler()
>     _amtToSleep = DEFAULT_SLEEP_IN_USEC;
>     bool ranGetTimeOfDay = false;
>
> -    UFList::iterator beg;
>     struct timeval now;
>     struct timeval start,finish;
>     gettimeofday(&start, 0);
> -    unsigned long long int timeNow = 0;
> +    TIME_IN_US timeNow = 0;
>
> -    UFList::iterator ufBeg;
> -    UFList::iterator nBeg;
>     MapTimeUF::iterator slBeg;
>     bool waiting = false;
> +    //unsigned long long int runCounter = 1;
>     while(!shouldExit())
>     {
> -        for(ufBeg = _activeRunningList.begin(); ufBeg != _activeRunningList.end(); )
> +        ++_runCounter;
> +        while(!_activeRunningList.empty())
>         {
> -            UF* uf = *ufBeg;
> -            _currentFiber = uf;
> +            if(shouldExit())
> +                break;
> +
> +            UF* uf = _activeRunningList.front();
> +            if(uf->_status == YIELDED &&
> +               uf->_lastRun == _runCounter) //we have looped back
> +                break;
> +            //printf("%lu - running uf %lu on iter %llu\n", pthread_self(), (uintptr_t)uf, _runCounter);
> +            _activeRunningList.pop_front();
> +            uf->_lastRun = _runCounter;
>             uf->_status = RUNNING;
> +            _currentFiber = uf;
>  #if __WORDSIZE == 64
>             uf_swapcontext(&_mainContext, &(uf->_UFContext));
>  #else
> @@ -320,20 +337,18 @@ void UFScheduler::runScheduler()
>             _currentFiber = 0;
>
>             if(uf->_status == BLOCKED)
> -            {
> -                ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
>                 continue;
> -            }
>             else if(uf->_status == COMPLETED)
>             {
> -                delete uf;
> -                ufBeg = _activeRunningList.erase(ufBeg); --_activeRunningListSize;
> +                if(uf->_myFactory)
> +                    uf->_myFactory->releaseUF(uf);
> +                else
> +                    delete uf;
>                 continue;
>             }
> -
>             //else uf->_status == RUNNING
> -            uf->_status = WAITING_TO_RUN;
> -            ++ufBeg;
> +            uf->_status = YIELDED;
> +            _activeRunningList.push_back(uf);
>         }
>
>
> @@ -354,20 +369,18 @@ void UFScheduler::runScheduler()
>             //TODO: do atomic comparison to see if there is anything in
>             //_nominateToAddToActiveRunningList before getting the lock
>             pthread_mutex_lock(&_mutexToNominateToActiveList);
> -            for(nBeg = _nominateToAddToActiveRunningList.begin();
> -                nBeg != _nominateToAddToActiveRunningList.end(); )
> +            do
>             {
> -                UF* uf = *nBeg;
> -                if(uf->_parentScheduler)
> +                UF* uf = _nominateToAddToActiveRunningList.front();
> +                if(uf->getParentScheduler())
>                 {
>                     uf->_status = WAITING_TO_RUN;
> -                    _activeRunningList.push_front(uf); ++_activeRunningListSize;
> +                    _activeRunningList.push_front(uf);
>                 }
>                 else //adding a new fiber
>                     addFiberToScheduler(uf, 0);
> -                nBeg = _nominateToAddToActiveRunningList.erase(nBeg);
> -            }
> -
> +                _nominateToAddToActiveRunningList.pop_front();
> +            }while(!_nominateToAddToActiveRunningList.empty());
>             pthread_mutex_unlock(&_mutexToNominateToActiveList);
>         }
>
> @@ -393,7 +406,7 @@ void UFScheduler::runScheduler()
>                         if(ufwi->_uf)
>                         {
>                             ufwi->_uf->_status = WAITING_TO_RUN;
> -                            _activeRunningList.push_front(ufwi->_uf); ++_activeRunningListSize;
> +                            _activeRunningList.push_front(ufwi->_uf);
>                             ufwi->_uf = NULL;
>                         }
>                         waiting = ufwi->_waiting;
> @@ -419,13 +432,13 @@ void UFScheduler::runScheduler()
>         }
>
>         //see if there is anything to do or is it just sleeping time now
> -        if(!_notifyFunc && !_activeRunningListSize && !shouldExit())
> +        if(!_notifyFunc && _activeRunningList.empty() && !shouldExit())
>         {
>             if(_inThreadedMode) //go to conditional wait (in threaded mode)
>             {
>                 struct timespec ts;
> -                unsigned long long int nSecToIncrement = (int)(_amtToSleep/1000000);
> -                unsigned long long int nUSecToIncrement = (int)(_amtToSleep%1000000);
> +                int nSecToIncrement = (int)(_amtToSleep/1000000);
> +                TIME_IN_US nUSecToIncrement = (TIME_IN_US)(_amtToSleep%1000000);
>                 if(!ranGetTimeOfDay)
>                     gettimeofday(&now, 0);
>                 ts.tv_sec = now.tv_sec + nSecToIncrement;
> @@ -442,7 +455,7 @@ void UFScheduler::runScheduler()
>     }
>     gettimeofday(&finish, 0);
>
> -    unsigned long long int diff = (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec - start.tv_usec);
> +    TIME_IN_US diff = (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec - start.tv_usec);
>     cerr<<pthread_self()<<" time taken in this thread = "<<diff<<"us"<<endl;
>  }
>
> @@ -498,10 +511,10 @@ int UFFactory::registerFunc(UF* uf)
>     return _size++;
>  }
>
> -const unsigned int CONSECUTIVE_LOCK_FAILURES_ALLOWED = 3;
> +const unsigned int CONSECUTIVE_LOCK_FAILURES_ALLOWED = 15;
>  bool UFMutex::lock(UF* uf)
>  {
> -    if(!uf || !uf->_parentScheduler)
> +    if(!uf || !uf->getParentScheduler())
>         return false;
>
>     getSpinLock();
> @@ -575,38 +588,28 @@ bool UFMutex::unlock(UF* uf)
>     if(!uf)
>         return false;
>
> -    UFList::iterator beg;
>     getSpinLock();
>
> -    beg = _listOfClientsWaitingOnLock.begin();
> -    if(uf == *beg) //check if this uf is the current owner of this lock
> +    if(uf == _listOfClientsWaitingOnLock.front()) //check if this uf is the current owner of this lock
>     {
> +        _listOfClientsWaitingOnLock.pop_front();
>         _lockCurrentlyOwned = false;
> -        beg = _listOfClientsWaitingOnLock.erase(beg);
>  #ifdef LOCK_DEBUG
>         printf("%lu u %d\n", (unsigned long int) ((uintptr_t)(void*)uf), _listOfClientsWaitingOnLock.size());
>  #endif
>
> -        bool releasedLock = false;
>         //notify the next UF in line
>         while(!_listOfClientsWaitingOnLock.empty())
>         {
> -            UF* tmpUf = *beg;
> -            if(!tmpUf || !tmpUf->_parentScheduler) //invalid tmpuf - cant wake it up
> +            UF* tmpUf = _listOfClientsWaitingOnLock.front();
> +            if(!tmpUf || !tmpUf->getParentScheduler()) //invalid tmpuf - cant wake it up
>             {
>  #ifdef LOCK_DEBUG
>                 printf("%lu nf1\n", (unsigned long int) ((uintptr_t)(void*)uf));
>  #endif
> -                beg = _listOfClientsWaitingOnLock.erase(beg);
> -                if(beg == _listOfClientsWaitingOnLock.end())
> -                    break;
> +                _listOfClientsWaitingOnLock.pop_front();
>                 continue;
>             }
> -            /*
> -            if(tmpUf->getStatus() == WAITING_TO_RUN) //this uf has already been put into the waiting to run list
> -                break;
> -                */
> -
>
>  #ifdef LOCK_DEBUG
>             printf("%lu wk %lu\n",
> @@ -615,14 +618,11 @@ bool UFMutex::unlock(UF* uf)
>  #endif
>
>             releaseSpinLock();
> -            releasedLock = true;
> -            uf->_parentScheduler->addFiberToScheduler(tmpUf, tmpUf->_parentScheduler->_tid);
> -            break;
> +            uf->getParentScheduler()->addFiberToScheduler(tmpUf, tmpUf->_parentScheduler->_tid);
> +            return true;
>         }
>
> -        if(!releasedLock)
> -            releaseSpinLock();
> -
> +        releaseSpinLock();
>         return true;
>     }
>     else
> @@ -635,7 +635,7 @@ bool UFMutex::unlock(UF* uf)
>     return false;
>  }
>
> -bool UFMutex::tryLock(UF* uf, unsigned long long int autoRetryIntervalInUS)
> +bool UFMutex::tryLock(UF* uf, TIME_IN_US autoRetryIntervalInUS)
>  {
>     while(1)
>     {
> @@ -668,7 +668,7 @@ bool UFMutex::condWait(UF* uf)
>     //the object is already in the hash
>     if(_listOfClientsWaitingOnCond.find(uf) == _listOfClientsWaitingOnCond.end())
>     {
> -        UFWaitInfo *ufwi = uf->_parentScheduler->getWaitInfo();
> +        UFWaitInfo *ufwi = uf->getParentScheduler()->getWaitInfo();
>         ufwi->_uf = uf;
>         ufwi->_waiting = true;
>
> @@ -708,7 +708,7 @@ void UFMutex::broadcast()
>         // If uf is not NULL, schedule it and make sure no one else can schedule it again
>         if(ufwi->_uf)
>         {
> -            ufs->addFiberToScheduler(ufwi->_uf, ufwi->_uf->_parentScheduler->_tid);
> +            ufs->addFiberToScheduler(ufwi->_uf, ufwi->_uf->getParentScheduler()->_tid);
>             ufwi->_uf = NULL;
>         }
>
> @@ -760,10 +760,10 @@ void UFMutex::signal()
>     }
>
>     if(uf_to_signal)
> -        ufs->addFiberToScheduler(uf_to_signal, uf_to_signal->_parentScheduler->_tid);
> +        ufs->addFiberToScheduler(uf_to_signal, uf_to_signal->getParentScheduler()->_tid);
>  }
>
> -int UFMutex::condTimedWait(UF* uf, unsigned long long int sleepAmtInUs)
> +bool UFMutex::condTimedWait(UF* uf, TIME_IN_US sleepAmtInUs)
>  {
>     bool result = false;
>     if(!uf)
> @@ -783,9 +783,9 @@ int UFMutex::condTimedWait(UF* uf, unsig
>     // Add to sleep queue
>     struct timeval now;
>     gettimeofday(&now, 0);
> -    unsigned long long int timeNow = timeInUS(now);
> +    TIME_IN_US timeNow = timeInUS(now);
>     ufwi->_sleeping = true;
> -    uf->_parentScheduler->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), ufwi));
> +    uf->getParentScheduler()->_sleepList.insert(std::make_pair((timeNow+sleepAmtInUs), ufwi));
>
>     uf->waitOnLock(); //this fxn will cause the fxn to wait till a signal, broadcast or timeout has occurred
>
> @@ -802,11 +802,11 @@ void* setupThread(void* args)
>     if(!args)
>         return 0;
>
> -    list<UF*>* ufsToStartWith = (list<UF*>*) args;
> +    UFList* ufsToStartWith = (UFList*) args;
>     UFScheduler ufs;
>     ufs.addFiberToScheduler(*ufsToStartWith, 0);
>     delete ufsToStartWith;
> -
> +
>     //run the scheduler
>     ufs.runScheduler();
>
> @@ -819,10 +819,24 @@ void UFScheduler::ufCreateThread(pthread
>     pthread_attr_t attr;
>     pthread_attr_init(&attr);
>     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
> -
> +
>     if(pthread_create(tid, &attr, setupThread, (void*)ufsToStartWith) != 0)
>     {
>         cerr<<"couldnt create thread "<<strerror(errno)<<endl;
>         exit(1);
>     }
>  }
> +
> +string getPrintableTime()
> +{
> +    char asctimeDate[32];
> +    asctimeDate[0] = '\0';
> +    time_t now = time(0);
> +    asctime_r(localtime(&now), asctimeDate);
> +
> +    string response = asctimeDate;
> +    size_t loc = response.find('\n');
> +    if(loc != string::npos)
> +        response.replace(loc, 1, "");
> +    return response;
> +}
>
> Modified: trafficserver/traffic/branches/UserFiber/core/src/UFAres.C
> URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFAres.C?rev=985334&r1=985333&r2=985334&view=diff
> ==============================================================================
> --- trafficserver/traffic/branches/UserFiber/core/src/UFAres.C (original)
> +++ trafficserver/traffic/branches/UserFiber/core/src/UFAres.C Fri Aug 13 19:43:09 2010
> @@ -1,6 +1,8 @@
> -#include "UFAres.H"
> +#include <UFAres.H>
>  #include <stdio.h>
>
> +using namespace std;
> +
>  static void printHost(struct hostent* host , struct ares_addrttl *ttls = 0, int nttl = 0)
>  {
>   //  return;
>
>
>