You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/08/14 00:14:32 UTC

svn commit: r985387 [2/3] - in /trafficserver/traffic/branches/UserFiber: core/include/ core/src/ protocol/ samples/

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConf.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConf.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConf.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConf.C Fri Aug 13 22:14:31 2010
@@ -1,74 +1,63 @@
-#include <UFConf.H>
-#include <UFIO.H>
+#include "UFConf.H"
+
 #include <fstream>
 #include <iostream>
 #include <sstream>
-#include <errno.h>
-#include <string.h>
-
-#include <fcntl.h>
-#include <sys/inotify.h>
-
-UFConf::UFConf(const string &conf) : conf_file(conf), _parent(NULL)
-{
-
-}
-
-void UFConf::init()
-{
-    // Parse default config
-    string conf_file_default = conf_file + ".default";
-    parse(conf_file_default);
-    
-    // Parse overrides
-    parse(conf_file);
-/*
-    string *conf_file_parent = getString("parent");
-    if(conf_file_parent != NULL) 
-    {
-        _setParent(getConf(*conf_file_parent));
-    }
-*/
-}
 
 /** set string value in conf
  *  Converts the string value that is passed in to a ConfValue and stores it in the conf hash
  */
-void UFConf::setString(const string &key, const string &type, const string &value)
+void UFConf::setString(const string &type, const string &value)
 {
-    _set<string>(key, type, value);
+    ConfValueBase *existingValue = get(type);
+    if(existingValue != NULL) {
+        delete existingValue;
+    }
+    ConfValue<string> *sharedString = new ConfValue<string>;
+    sharedString->mElement = value;
+    _data[type] = sharedString;
 }
 
 /** set int value in conf
  *  Converts the int value that is passed in to a ConfValue and stores it in the conf hash
  */
-void UFConf::setInt(const string &key, const string &type, int value)
+void UFConf::setInt(const string &type, int value)
 {
-    _set<int>(key, type, value);
+    ConfValueBase *existingValue = get(type);
+    if(existingValue != NULL) {
+        delete existingValue;
+    }
+    ConfValue<int> *sharedInt = new ConfValue<int>;
+    sharedInt->mElement = value;
+    _data[type] = sharedInt;
 }
 
 /** set bool value in conf
  *  Converts the bool value that is passed in to a ConfValue and stores it in the conf hash
  */
-void UFConf::setBool(const string &key, const string &type, bool value)
+void UFConf::setBool(const string &type, bool value)
 {
-    _set<bool>(key, type, value);
+    ConfValueBase *existingValue = get(type);
+    if(existingValue != NULL) {
+        delete existingValue;
+    }
+    ConfValue<bool> *sharedBool = new ConfValue<bool>;
+    sharedBool->mElement = value;
+    _data[type] = sharedBool;
 }
 
 /** set double value in conf
  *  Converts double value that is passed in to a ConfValue and stores it in the conf hash
  */
-void UFConf::setDouble(const string &key, const string &type, double value)
+void UFConf::setDouble(const string &type, double value)
 {
-    _set<double>(key, type, value);
-}
-
-/** set vector<string> value in conf
- *  Converts the vector<string> value that is passed in to a ConfValue and stores it in the conf hash
- */
-void UFConf::setStringVector(const string &key, const string &type, const vector<string> &value)
-{
-    _set< vector<string> >(key, type, value);
+    ConfValueBase *existingValue = get(type);
+    if(existingValue != NULL) {
+        delete existingValue;
+    }
+    ConfValue<double> *sharedDouble = new ConfValue<double>;
+    sharedDouble->mElement = value;
+    _data[type] = sharedDouble;
 }
 
 /** Get the string value associated with the key
@@ -78,7 +67,13 @@ void UFConf::setStringVector(const strin
  */
 string *UFConf::getString(const string &key)
 {
-    return _get<string>(key);
+    ConfValue<string> *sharedString = (ConfValue<string> *)get(key);
+    if(sharedString != NULL) {
+        return &sharedString->mElement;
+    }
+    if(_parent == NULL)
+        return NULL;
+    return _parent->getString(key);
 }
 
 /** Get the int value associated with the key
@@ -88,7 +83,13 @@ string *UFConf::getString(const string &
  */
 int *UFConf::getInt(const string &key)
 {
-    return _get<int>(key);
+    ConfValue<int> *sharedInt = (ConfValue<int> *)get(key);
+    if(sharedInt != NULL) {
+        return &sharedInt->mElement;
+    }
+    if(_parent == NULL)
+        return NULL;
+    return _parent->getInt(key);
 }
 
 /** Get the bool value associated the with key
@@ -98,7 +99,13 @@ int *UFConf::getInt(const string &key)
  */
 bool *UFConf::getBool(const string &key)
 {
-    return _get<bool>(key);
+    ConfValue<bool> *sharedBool = (ConfValue<bool> *)get(key);
+    if(sharedBool != NULL) {
+        return &sharedBool->mElement;
+    }
+    if(_parent == NULL)
+        return NULL;
+    return _parent->getBool(key);
 }
 
 /** Get the double value associated with the key
@@ -106,19 +113,15 @@ bool *UFConf::getBool(const string &key)
  *  If key is not found in the local conf, forwards request to parent conf
  *  If key is not found in either local or parent conf, NULL is returned
  */
-double *UFConf::getDouble(const string &key)
-{
-    return _get<double>(key);
-}
-
-/** Get the vector<string> value associated with the key
- *  Looks at local conf hash for the key
- *  If key is not found in the local conf, forwards request to parent conf
- *  If key is not found in either local or parent conf, NULL is returned
- */
-vector<string> *UFConf::getStringVector(const string &key)
+double *UFConf::getDouble(const string &type)
 {
-    return _get< vector<string> >(key);
+    ConfValue<double> *sharedDouble = (ConfValue<double> *)get(type);
+    if(sharedDouble != NULL) {
+        return &sharedDouble->mElement;
+    }
+    if(_parent == NULL)
+        return NULL;
+    return _parent->getDouble(type);
 }
 
 /** get value associated with the key that is passed in
@@ -136,21 +139,9 @@ ConfValueBase *UFConf::get(const string 
     return _parent->get(key);
 }
 
-/** Set parent config
- *  Also caches any config parameters that the child may be interested in
- */
-void UFConf::_setParent(UFConf *parent)
-{
-    _parent = parent;
-    for(hash_map<string, ConfValueBase *>::iterator it = parent->_data.begin();
-        it != parent->_data.end(); it++)
-    {
-        cache(it->first, it->second);
-    }
-}
-
 /** Parse config file and store in conf hash
- *  calls parseLine for each line in the conf file   
+ *  Looks for config values of type STRING, INT, DOUBLE and BOOL
+ *  Skips over lines beginning with '#'
  */
 bool UFConf::parse(const std::string &conf_file)
 {
@@ -158,73 +149,50 @@ bool UFConf::parse(const std::string &co
     infile.open(conf_file.c_str());
     if(!infile.is_open()) 
         return false; // Could not open file
-    string line;
-    bool ok = true;
-    while(getline(infile, line))
-    {
-        // Ignore lines starting with '#'
-        if(line[0] != '#')
-            ok &= parseLine(line);
-    }
-    infile.close();
-    afterParse();
-    return ok;
-}
 
-/** Parses config line
- *  Looks for config values of type STRING, INT, DOUBLE and BOOL
- *  Skips over lines beginning with '#'
- */
-bool UFConf::parseLine(const std::string &line)
-{
+    string line;
     istringstream instream;
-    instream.str(line);  // Use s as source of input.
-    string conf_key, conf_key_type;
-    if (instream >> conf_key >> conf_key_type) 
+    while(getline(infile, line))
     {
-        // get type from config file, read into corresponding value and store  
-        string string_value;
-        int int_value;
-        double double_value;
-        bool bool_value;
-        if(conf_key_type == "STRING")
+        instream.clear(); // Reset from possible previous errors.
+        instream.str(line);  // Use s as source of input.
+        string conf_key, conf_key_type;
+        if (instream >> conf_key >> conf_key_type) 
         {
-            string string_val_temp;
-            while(instream >> string_val_temp) 
+            // skip lines starting with #
+            if(conf_key[0] == '#')
+                continue;
+
+            // get type from config file, read into corresponding value and store  
+            string string_value;
+            int int_value;
+            double double_value;
+            bool bool_value;
+            if(conf_key_type == "STRING")
             {
-                    if(string_value.length())
-                        string_value += " ";
-                    string_value += string_val_temp;
+                if(instream >> string_value)
+                    setString(conf_key, string_value);
+            }
+            if(conf_key_type == "INT")
+            {
+                if(instream >> int_value)
+                    setInt(conf_key, int_value);
+            }
+            if(conf_key_type == "DOUBLE")
+            {
+                if(instream >> double_value)
+                    setDouble(conf_key, double_value);
+            }
+            if(conf_key_type == "BOOL")
+            {
+                if(instream >> bool_value)
+                    setBool(conf_key, bool_value);
             }
-            setString(conf_key, conf_key_type, string_value);
-        }
-        if(conf_key_type == "INT")
-        {
-            if(instream >> int_value)
-                setInt(conf_key, conf_key_type, int_value);
-        }
-        if(conf_key_type == "DOUBLE")
-        {
-            if(instream >> double_value)
-                setDouble(conf_key, conf_key_type, double_value);
-        }
-        if(conf_key_type == "BOOL")
-        {
-            if(instream >> bool_value)
-                setBool(conf_key, conf_key_type, bool_value);
         }
     }
-    return true;
-}
 
-void UFConf::clear()
-{
-    for(std::hash_map<std::string, ConfValueBase *>::iterator it = _data.begin(); it != _data.end(); it++)
-    {
-        if(it->second != NULL)
-            delete it->second;
-    }
-    _data.clear();
+    infile.close();
+    return true;
 }
 
 /**
@@ -244,142 +212,7 @@ ostream& operator<<(ostream& output, con
     return output;
 }
 
-UFConfManager::UFConfManager() : _notify_fd(-1)
-{
-    _notify_fd = inotify_init();
-    if(_notify_fd >= 0)
-    {
-        int flags = ::fcntl(_notify_fd, F_GETFL, 0);
-        if(flags != -1)
-            ::fcntl(_notify_fd, F_SETFL, flags | O_NONBLOCK);
-    }
-}
-
-// Default refresh time is 30 mins
-long UFConfManager::_refreshTime = 30000000;
-long UFConfManager::getRefreshTime()
-{
-    return _refreshTime;
-}
-
-void UFConfManager::setRefreshTime(long rtime)
-{
-    _refreshTime = rtime;
-}
-
-pthread_key_t UFConfManager::_createThreadKey()
-{
-    if(pthread_key_create(&UFConfManager::threadSpecificKey, 0) != 0)
-    {
-        cerr<<"couldnt create thread specific key "<<strerror(errno)<<endl;
-        exit(1);
-    }
-    return UFConfManager::threadSpecificKey;
-}
-pthread_key_t UFConfManager::threadSpecificKey = UFConfManager::_createThreadKey();
-
-UFConfManager* UFConfManager::getConfManager()
-{
-    void * ret = pthread_getspecific(threadSpecificKey);
-    return (UFConfManager*) ret;
-}
-
-// FIXME : Random number picked from example code
-const int BUFSZ = 1024;
-
-void UFConfManager::reload()
-{
-#ifdef __linux__
-    // Wrap inotify fd in UFIO
-    UFIO* ufio = new UFIO(UFScheduler::getUF());
-    if(!ufio || !ufio->setFd(_notify_fd, false/*has already been made non-blocking*/))
-    {
-        cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup conf refresher" << endl;
-        return;
-    }
-
-    // Check for changes from inotify
-    while(1)
-    {
-        char buf[BUFSZ];
-        int n = ufio->read(buf, sizeof(buf));
-        if (n <= 0) 
-        {
-            continue;
-        }
-    
-        // Go over changes
-        // Collect confs that were changed
-        vector<UFConf *> files_changed;
-        int i = 0;
-        while (i < n) 
-        {
-            struct inotify_event *ev;
-            ev = (struct inotify_event *) &buf[i];
-    
-            if (ev && ev->wd && (ev->mask & IN_MODIFY))
-            {
-                // Check if the file that was modified is in the conf system
-                std::hash_map<std::string, UFConf *>::iterator it = _configs.find(_watch_fd_map[ev->wd]);
-                if(it == _configs.end())
-                    continue; // This should never happen
-    
-                // cerr <<getPrintableTime()<<" "<<getpid()<< "Reloading conf : " << it->first << endl;
-    
-                // Conf found in system. Reparse.
-                it->second->clear();
-                it->second->parse(it->first);
-    
-                files_changed.push_back(it->second);
-            }
-            i += sizeof(struct inotify_event) + ev->len;
-        }
-   
-       
-        // cerr <<getPrintableTime()<<" "<<getpid()<< "file_changed size : " << files_changed.size() << endl;
-        for(vector<UFConf *>::iterator it = files_changed.begin(); it != files_changed.end(); it++)
-        {
-            _reloadChildren(*it);
-        }
-    }
-#else
-    // system does not support inotify.
-    // brute force. 
-    // reparse all configs.
-    for(std::hash_map<std::string, UFConf *>::iterator it = _configs.begin(); it != _configs.end(); it++)
-    {
-        it->second->clear();
-        it->second->parse(it->first);
-    }
-
-    // reload children for every conf object
-    for(std::hash_map<std::string, UFConf *>::iterator it = _configs.begin(); it != _configs.end(); it++)
-    {
-        _reloadChildren(it->second);
-    }
-#endif
-}
-
-void UFConfManager::_reloadChildren(UFConf *parent_conf)
-{
-    // Get children
-    hash_map< string, vector<string> >::iterator child_map_it = _child_map.find(parent_conf->conf_file);
-    if(child_map_it == _child_map.end())
-        return; // Leaf node
-
-    vector<string> &children = child_map_it->second;
-    // Go over children and make them pickup changes from parent
-    for(vector<string>::iterator vit = children.begin(); vit != children.end(); vit++)
-    {
-        std::hash_map<std::string, UFConf *>::iterator conf_map_it = _configs.find(*vit);
-        if(conf_map_it == _configs.end())
-            continue;
-        conf_map_it->second->_setParent(parent_conf);
-
-        // Recursively reload children
-        _reloadChildren(conf_map_it->second);
-    }
-}
+std::hash_map<std::string, UFConf *> UFConfManager::_configs;
 
 /** Add new child conf
  *  Create new conf and set parent to conf object corresponding to the parent_conf that is passed in
@@ -393,24 +226,17 @@ UFConf* UFConfManager::addChildConf(cons
         return it->second;
     }
 
-    // Create conf
-    UFConf *conf_created = addConf(conf_file);
-
     // Check if parent config was created
     it = _configs.find(parent_conf_file);
     if(it == _configs.end())
-    {
-        cerr << "Warning : Parent config was not created. Not setting parent conf" << endl;
-    }
-    else 
-    {
-        // Set parent conf
-        if(conf_created != NULL)
-        {
-            conf_created->_setParent(it->second);
-            _child_map[parent_conf_file].push_back(conf_file);
-        }
-    }
+        return NULL; // Parent config was not created
+
+    // Create conf
+    UFConf *conf_created = addConf(conf_file);
+
+    // Set parent conf
+    if(conf_created != NULL)
+        conf_created->setParent(it->second);
     return conf_created;
 }
 
@@ -427,46 +253,25 @@ UFConf* UFConfManager::addConf(const str
     }
 
     // Create new UFConf
-    UFConf *conf = UFConfFactory<UFConf>(conf_file);
-    
-    // Store in conf map
-    _configs[conf_file] = conf;
+    UFConf *conf = new UFConf;
 
-    // inotify watch file for changes
-    _addWatch(conf_file);
-    return conf;
-}
+    // Parse default config
+    string conf_file_default = conf_file + ".default";
+    conf->parse(conf_file_default);
+    
+    // Parse overrides
+    conf->parse(conf_file);
 
-/** Add new conf
- *  Uses the conf that is passed in
- */
-bool UFConfManager::addConf(UFConf *conf, const string &conf_file)
-{
-    hash_map<string, UFConf*>::iterator it = _configs.find(conf_file);
-    if(it != _configs.end())
+    string *conf_file_parent = conf->getString("parent");
+    if(conf_file_parent != NULL) 
     {
-        // Conf already exists
-        return false;
+        conf->setParent(getConf(*conf_file_parent));
     }
 
     // Store in conf map
     _configs[conf_file] = conf;
 
-    // inotify watch file for changes
-    _addWatch(conf_file);
-    return true;
-}
-
-/** Add new child conf
- *  Add conf object to manager and set parent
- */
-bool UFConfManager::addChildConf(UFConf *child_conf, const string &conf_file, const string &parent_conf_file)
-{
-    if(!addConf(child_conf, conf_file))
-        return false;
-    _child_map[parent_conf_file].push_back(conf_file);
-    child_conf->_setParent(getConf(parent_conf_file));
-    return true;
+    return conf;
 }
 
 /**
@@ -494,19 +299,3 @@ void UFConfManager::dump()
         cerr << "=============CONF " << it->first << " ENDS" << endl;
     }
 }
-
-void UFConfManager::_addWatch(const string &conf_file)
-{
-    if(_notify_fd < 0)
-    {
-        cerr << "UFConfManager::_addWatch could not add watch on " << conf_file << endl;
-        return;
-    }
-
-    int wd = inotify_add_watch(_notify_fd, conf_file.c_str(), IN_MODIFY);
-    if(wd < 0)
-        cerr << "UFConfManager::_addWatch error adding watch on " << conf_file << endl;
-
-    _watch_fd_map[wd] = conf_file;
-    // cerr << "UFConfManager::_addWatch watch on " << conf_file << endl;
-}

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C Fri Aug 13 22:14:31 2010
@@ -1,9 +1,7 @@
-#include <UFConnectionPool.H>
+#include "UFConnectionPool.H"
 #include "UFConnectionPoolImpl.H"
-#include <UFStatSystem.H>
-#include <UFStats.H>
 
-#include <UFIO.H>
+#include "UFIO.H"
 #include <stdlib.h>
 #include <fstream>
 #include <iostream>
@@ -20,7 +18,7 @@
 #include <string.h>
 
 #ifdef USE_CARES
-#include <UFAres.H>
+#include "UFAres.H"
 #endif
 
 const unsigned short int PERCENT_LOGGING_SAMPLING = 5;
@@ -28,11 +26,11 @@ const unsigned short int PERCENT_LOGGING
 using namespace std;
 
 UFConnIPInfo::UFConnIPInfo(const string& ip, 
-                           unsigned int port,
-                           bool persistent, 
-                           int maxSimultaneousConns, 
-                           TIME_IN_US connectTimeout,
-                           TIME_IN_US timeToFailOutIPAfterFailureInSecs)
+                                       unsigned int port,
+                                       bool persistent, 
+                                       int maxSimultaneousConns, 
+                                       TIME_IN_US connectTimeout,
+                                       TIME_IN_US timeToFailOutIPAfterFailureInSecs)
 {
     _ip = ip;
     _port = port;
@@ -42,6 +40,7 @@ UFConnIPInfo::UFConnIPInfo(const string&
     _connectTimeout = connectTimeout;
     _timedOut = 0;
     _inProcessCount = 0;
+    _timeToExpireDNSInfo = 0;
 
     memset(&_sin, 0, sizeof(_sin));
     _sin.sin_family = AF_INET;
@@ -120,30 +119,24 @@ UFConnGroupInfo* UFConnectionPoolImpl::a
     return group;
 }
 
-const unsigned int DEFAULT_TTL = 300;
-bool UFConnectionPoolImpl::createIPInfo(const string& groupName, UFConnGroupInfo* groupInfo, TIME_IN_US connectTimeout)
+bool UFConnectionPoolImpl::createIPInfo(const string& groupName, UFConnGroupInfo* groupInfo)
 {
     UFConnIPInfoList& ipInfoList = groupInfo->getIpInfoList();
 
     size_t indexOfColon = groupName.find(':');
     string hostName = groupName;
     unsigned int port = 0;
-    string portString = "";
     if(indexOfColon != string::npos)
     {
         hostName = groupName.substr(0, indexOfColon);
-        portString = groupName.substr(indexOfColon+1).c_str();
-        port = atoi(portString.c_str());
+        port = atoi(groupName.substr(indexOfColon+1).c_str());
     }
-    else
-        return false;
 
     //have to figure out the hosts listed w/ 
     //
     int lowestTTL = 0;
     UFConnIPInfo* ipInfo = 0;
     string ipString;
-    string ipStringToInsert;
 
 #ifdef USE_CARES
     UFAres ufares;
@@ -157,10 +150,8 @@ bool UFConnectionPoolImpl::createIPInfo(
     {
         ipString = inet_ntoa(results[i].ipaddr);
 #else
-    struct hostent hostInfo, *h;
-    int hErrno = 0;
-    char tmpHostBuf[1024];
-    if(gethostbyname_r (hostName.c_str(), &hostInfo, tmpHostBuf, 1024, &h, &hErrno) || hErrno)
+    struct hostent* h = gethostbyname(hostName.c_str());
+    if(!h)
         return false;
 
     for(unsigned int i = 0; 1; i++)
@@ -169,14 +160,13 @@ bool UFConnectionPoolImpl::createIPInfo(
             break;
         ipString = inet_ntoa(*((in_addr*)h->h_addr_list[i]));
 #endif
-        ipStringToInsert = ipString + ":" + portString;
         //check if the ip already exists in the system
-        IPInfoStore::iterator index = _ipInfoStore.find(ipStringToInsert);
+        IPInfoStore::iterator index = _ipInfoStore.find(ipString);
         if(index == _ipInfoStore.end())
         {
-            ipInfo = new UFConnIPInfo(ipString, port, true, 0, connectTimeout, 30);
-            _ipInfoStore[ipStringToInsert] = ipInfo;
-            index = _ipInfoStore.find(ipStringToInsert);
+            ipInfo = new UFConnIPInfo(ipString, port, true, 0, 0, 30);
+            _ipInfoStore[ipString] = ipInfo;
+            index = _ipInfoStore.find(ipString);
         }
         ipInfo = index->second;
 
@@ -184,9 +174,9 @@ bool UFConnectionPoolImpl::createIPInfo(
         if(lowestTTL > results[i].ttl || !lowestTTL)
             lowestTTL = results[i].ttl;
 #else
-        lowestTTL = DEFAULT_TTL;
+        lowestTTL = 300; //default to 60 secs
 #endif
-        ipInfoList.push_back(ipStringToInsert);
+        ipInfoList.push_back(ipString);
     }
 
     time_t timeToExpireAt = lowestTTL + time(0);
@@ -197,7 +187,7 @@ bool UFConnectionPoolImpl::createIPInfo(
 }
 
 //TODO: return a ResultStructure which includes the UFConnIPInfo* along w/ the UFIO* so that the map doesnt have to be looked up on every return back of the structure
-UFIO* UFConnectionPoolImpl::getConnection(const std::string& groupName, bool waitForConnection, TIME_IN_US connectTimeout)
+UFIO* UFConnectionPoolImpl::getConnection(const std::string& groupName, bool waitForConnection)
 {
     if(!groupName.length())
         return 0;
@@ -219,12 +209,12 @@ UFIO* UFConnectionPoolImpl::getConnectio
         if(!ipInfoList.empty()) //clear the existing set since the ttl expired
             ipInfoList.clear();
 
-        if(!createIPInfo(groupName, groupInfo, connectTimeout))
+        if(!createIPInfo(groupName, groupInfo))
             return 0;
     }
 
 
-    set<unsigned int> alreadySeenIPList; //this list will keep track of the ips that we've already seen
+    map<unsigned int,unsigned int> alreadySeenIPList; //this list will keep track of the ips that we've already seen
     unsigned int groupIpSize = ipInfoList.size();
     while(alreadySeenIPList.size() < groupIpSize) //bail out if we've seen all the ips already
     {
@@ -239,7 +229,7 @@ UFIO* UFConnectionPoolImpl::getConnectio
             if(ipInfo && !ipInfo->_currentlyAvailableConnections.empty())
             {
                 elementNum = i;
-                alreadySeenIPList.insert(elementNum);
+                alreadySeenIPList[elementNum] = 1;
                 break;
             }
         }
@@ -250,7 +240,7 @@ UFIO* UFConnectionPoolImpl::getConnectio
             elementNum = random() % groupIpSize;
             if(alreadySeenIPList.find(elementNum) != alreadySeenIPList.end()) //already seen this IP
                 continue;
-            alreadySeenIPList.insert(elementNum);
+            alreadySeenIPList[elementNum] = 1;
         }
 
         UFConnIPInfo* ipInfo = getIPInfo(ipInfoList[elementNum]);
@@ -271,24 +261,8 @@ UFConnIPInfo* UFConnectionPoolImpl::getI
     return ((index != _ipInfoStore.end()) ? index->second : 0);
 }
 
-uint32_t statConnFromPoolLocation = 0;
-uint32_t statConnFromNewConnLocation = 0;
-uint32_t statConnMarkedInvalid = 0;
-static bool registerConnPoolStats()
-{
-    UFStatSystem::registerStat("connPool.conn_from_pool", &statConnFromPoolLocation);
-    UFStatSystem::registerStat("connPool.conn_from_new_connection", &statConnFromNewConnLocation);
-    UFStatSystem::registerStat("connPool.conn_marked_invalid", &statConnMarkedInvalid);
-
-    return true;
-}
-
 UFIO* UFConnIPInfo::getConnection(UFConnIPInfoMap& _ufConnIPInfoMap)
 {
-    static bool statsRegistered = false;
-    if(!statsRegistered)
-        statsRegistered = registerConnPoolStats();
-
     //2. while the host is timedout - pick another one (put into the list of already seen ips)
     time_t currTime = time(0);
     _lastUsed = currTime;
@@ -302,28 +276,28 @@ UFIO* UFConnIPInfo::getConnection(UFConn
         if(!_currentlyAvailableConnections.empty())
         {
             //3. pick a connection from the currently available conns
-            for(UFIOIntMap::iterator beg = _currentlyAvailableConnections.begin();
-                beg != _currentlyAvailableConnections.end(); )
+            UFIOIntMap::iterator beg = _currentlyAvailableConnections.begin();
+            for(; beg != _currentlyAvailableConnections.end(); 
+                beg = _currentlyAvailableConnections.begin()  // we're resetting to the beginning to avoid
+                    // the case of two threads ending up getting 
+                    // the same connection
+                )
             {
-                returnConn = beg->second;
-                _currentlyAvailableConnections.erase(beg++);
+                returnConn = beg->first;
+                _currentlyAvailableConnections.erase(beg);
                 if(!returnConn)
                 {
                     cerr<<time(0)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
                     continue;
                 }
 
-
                 if(returnConn->_markedActive) //this indicates that the conn. had some activity while sleeping - thats no good
                 {
-                    UFStatSystem::increment(statConnMarkedInvalid, 1);
                     UFConnIPInfoMap::iterator index = _ufConnIPInfoMap.find(returnConn);
                     _ufConnIPInfoMap.erase(index);
                     delete returnConn;
                     continue;
                 }
-
-                UFStatSystem::increment(statConnFromPoolLocation, 1);
                 returnConn->_active = true;
                 _currentlyUsedCount++;
                 return returnConn;
@@ -350,7 +324,6 @@ UFIO* UFConnIPInfo::getConnection(UFConn
                 incInProcessCount(-1);
                 if(returnConn)
                 {
-                    UFStatSystem::increment(statConnFromNewConnLocation, 1);
                     _currentlyUsedCount++;
                     _ufConnIPInfoMap[returnConn] = this;
                 }
@@ -412,36 +385,38 @@ void UFConnectionPoolImpl::clearUnusedCo
             continue;
         }
 
-        //TODO: order _currentlyAvailableConnections by time
         //walk the available connection list to see if any conn. hasnt been used for a while
-        for(UFIOIntMap::iterator conBeg = ipInfo->_currentlyAvailableConnections.begin();
-            conBeg != ipInfo->_currentlyAvailableConnections.end(); )
+        UFIOIntMap::iterator conBeg = ipInfo->_currentlyAvailableConnections.begin();
+        for(; conBeg != ipInfo->_currentlyAvailableConnections.end(); )
         {
-            UFIO* conn = conBeg->second;
-            if(!conn)
+            UFIO* conn = conBeg->first;
+            if(!conn) //TODO
             {
                 ipInfo->_currentlyAvailableConnections.erase(conBeg++);
                 continue;
             }
-            
-            time_t connLastUsed = conBeg->first;
-            if(((int)connLastUsed + (int)lastUsedTimeDiff) > (int)currTime) //the time hasnt expired yet
-                break;
 
-            //remove the conn from the _UFConnIPInfoMap list
-            UFConnIPInfoMap::iterator index = _ufConnIPInfoMap.find(conn);
-            if(index != _ufConnIPInfoMap.end())
-                _ufConnIPInfoMap.erase(index);
-            delete conn; //delete the connection
-            ipInfo->_currentlyAvailableConnections.erase(conBeg++);
+            if((int)(conBeg->second + lastUsedTimeDiff) < (int)currTime /*the conn. has expired*/ ||
+               (conBeg->first->_markedActive)/*somehow this conn. had some update*/)
+            {
+                //remove the conn from the _UFConnIPInfoMap list
+                UFConnIPInfoMap::iterator index = _ufConnIPInfoMap.find(conBeg->first);
+                if(index != _ufConnIPInfoMap.end())
+                    _ufConnIPInfoMap.erase(index);
+                delete conBeg->first; //delete the connection
+                ipInfo->_currentlyAvailableConnections.erase(conBeg++);
+                continue;
+            }
+            ++conBeg;
         }
-
-        //check the last time this ipinfo was ever used - if its > than 600s remove it
-        if(ipInfo->_currentlyAvailableConnections.empty() &&
-           (ipInfo->getLastUsed() + DEFAULT_LAST_USED_TIME_INTERVAL_FOR_IP < (unsigned int) currTime))
+        if(ipInfo->_currentlyAvailableConnections.empty())
         {
-            _ipInfoStore.erase(beg++);
-            continue;
+            //check the last time this ipinfo was ever used - if its > than 300s remove it
+            if(ipInfo->getLastUsed() + DEFAULT_LAST_USED_TIME_INTERVAL_FOR_IP < (unsigned int) currTime)
+            {
+                _ipInfoStore.erase(beg++);
+                continue;
+            }
         }
         ++beg;
     }
@@ -469,7 +444,8 @@ void UFConnectionPoolImpl::releaseConnec
     //add to the available list
     if(connOk && ipInfo->getPersistent())
     {
-        ipInfo->_currentlyAvailableConnections.insert(make_pair(time(0), ufIO));
+        time_t currTime = time(0);
+        ipInfo->_currentlyAvailableConnections[ufIO] = currTime;
         ufIO->_markedActive = false;
         ufIO->_active = false;
     }
@@ -481,21 +457,17 @@ void UFConnectionPoolImpl::releaseConnec
     }
 
     //signal to all the waiting threads that there might be a connection available
-    /*TODO: lock only if we move the conn. pool to support running on multiple threads
     UF* this_user_fiber = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
     ipInfo->getMutexToCheckSomeConnection()->lock(this_user_fiber);
-    */
     ipInfo->getMutexToCheckSomeConnection()->broadcast();
-    /*
     ipInfo->getMutexToCheckSomeConnection()->unlock(this_user_fiber);
-    */
 }
 
 UFConnectionPoolImpl::~UFConnectionPoolImpl() 
 { 
 }
 
-const unsigned int DEFAULT_COVER_LIST_TIME_IN_US = 240*1000*1000;
+const unsigned int DEFAULT_COVER_LIST_TIME_IN_US = 60*1000*1000;
 void UFConnectionPoolCleaner::run()
 {
     UF* this_uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
@@ -549,9 +521,9 @@ bool UFConnectionPool::addGroup(UFConnGr
     return (_impl ? _impl->addGroup(groupInfo) : false);
 }
 
-UFIO* UFConnectionPool::getConnection(const std::string& groupName, bool waitForConnection, TIME_IN_US connectTimeout)
+UFIO* UFConnectionPool::getConnection(const std::string& groupName, bool waitForConnection)
 {
-    return (_impl ? _impl->getConnection(groupName, waitForConnection, connectTimeout) : 0);
+    return (_impl ? _impl->getConnection(groupName, waitForConnection) : 0);
 }
 
 void UFConnectionPool::releaseConnection(UFIO* ufIO, bool connOk)
@@ -577,7 +549,7 @@ void UFConnectionPool::clearUnusedConnec
 
 TIME_IN_US UFConnectionPool::getTimeToTimeoutIPAfterFailure()
 {
-    return (_impl ? _impl->getTimeToTimeoutIPAfterFailure() : -1);
+    return (_impl ? _impl->getTimeToTimeoutIPAfterFailure() : 0);
 }
 
 void UFConnectionPool::setMaxSimulConnsPerHost(int input)

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H Fri Aug 13 22:14:31 2010
@@ -2,37 +2,18 @@
 #define UFCONNECTIONPOOLIMPL_H
 
 #include <time.h>
-#include <UF.H>
-#include <UFConnectionPool.H>
-
-#include <ext/hash_map>
-using namespace std;        
-namespace std { using namespace __gnu_cxx; }
-struct VoidPtrHash 
-{
-    size_t operator()(void* p) const 
-    {
-        return reinterpret_cast< uintptr_t >(p);
-    }
-};
-
-struct VoidPtrEqual 
-{
-    bool operator()(void* v1, void* v2) const
-    {
-        return (v1 == v2);
-    }
-};
 
+#include "UF.H"
+#include "UFConnectionPool.H"
 
 struct UFIO;
 struct UFConnIPInfo;
 struct UFConnGroupInfo;
 
-typedef std::map<std::string, UFConnGroupInfo*>                 GroupIPMap;
-typedef std::map<std::string, UFConnIPInfo*>                    IPInfoStore;
-typedef std::multimap<unsigned long long int, UFIO*>            UFIOIntMap;
-typedef hash_map<void*, UFConnIPInfo*, VoidPtrHash, VoidPtrEqual> UFConnIPInfoMap;
+typedef std::map<std::string, UFConnGroupInfo*> GroupIPMap;
+typedef std::map<std::string, UFConnIPInfo*>    IPInfoStore;
+typedef std::map<UFIO*, time_t>                 UFIOIntMap;
+typedef std::map<UFIO*, UFConnIPInfo*>          UFConnIPInfoMap;
 
 const time_t DEFAULT_TIMEOUT_OF_IP_ON_FAILURE = 10;
 struct UFConnectionPoolImpl
@@ -45,7 +26,7 @@ struct UFConnectionPoolImpl
     
     bool addGroup(UFConnGroupInfo* stGroupInfo);
     void removeGroup(const std::string& name);
-    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true, TIME_IN_US connectTimeout = -1);
+    UFIO* getConnection(const std::string& groupName, bool waitForConnection = true);
     void releaseConnection(UFIO* ufIO, bool connOk = true);
     UFConnGroupInfo* addGroupImplicit(const std::string& groupName);
 
@@ -64,7 +45,7 @@ protected:
     time_t                      _timeToTimeoutIPAfterFailure;  ///How long should we time out an IP if there is a failure
 
     UFConnIPInfo* getIPInfo(const std::string& name);
-    bool createIPInfo(const std::string& groupName, UFConnGroupInfo* groupInfo, TIME_IN_US connectTimeout = -1);
+    bool createIPInfo(const std::string& groupName, UFConnGroupInfo* groupInfo);
 };
 inline void UFConnectionPoolImpl::setMaxSimulConnsPerHost(int input) { _maxSimulConnsPerHost = input; }
 inline int UFConnectionPoolImpl::getMaxSimulConnsPerHost() { return _maxSimulConnsPerHost; }
@@ -87,6 +68,8 @@ struct UFConnIPInfo
     unsigned int        getInProcessCount() const;
     bool                getPersistent() const;
     unsigned int        getTimeToFailOutIPAfterFailureInSecs() const;
+    unsigned int        getTimeToExpire() const;
+    void                setTimeToExpire(time_t input);
     TIME_IN_US          getConnectTimeout() const;
     time_t              getTimedOut() const;
     void                setTimedOut(time_t t);
@@ -108,6 +91,7 @@ protected:
     unsigned int        _maxSimultaneousConns;
     bool                _persistent; 
 
+    unsigned int        _timeToExpireDNSInfo;
     unsigned int        _timeToFailOutIPAfterFailureInSecs; ///how many s to try before considering the connect a failure
     TIME_IN_US          _connectTimeout; ///how many ms to try before considering the connect a failure
     time_t              _timedOut;
@@ -119,13 +103,15 @@ protected:
 };
 inline time_t UFConnIPInfo::getLastUsed() const { return _lastUsed; }
 inline struct sockaddr_in* UFConnIPInfo::getSin() { return &_sin; }
-inline const std::string& UFConnIPInfo::getIP() const { return _ip; }
+inline const string& UFConnIPInfo::getIP() const { return _ip; }
 inline unsigned int UFConnIPInfo::getMaxSimultaneousConns() const { return _maxSimultaneousConns; }
 inline unsigned int UFConnIPInfo::getInProcessCount() const { return _inProcessCount; }
 inline void UFConnIPInfo::incInProcessCount(int numToIncrement) { _inProcessCount += numToIncrement; }
 inline bool UFConnIPInfo::getPersistent() const { return _persistent; }
 inline unsigned int UFConnIPInfo::getTimeToFailOutIPAfterFailureInSecs() const { return _timeToFailOutIPAfterFailureInSecs; }
 inline TIME_IN_US UFConnIPInfo::getConnectTimeout() const { return _connectTimeout; }
+inline unsigned int UFConnIPInfo::getTimeToExpire() const { return _timeToExpireDNSInfo; }
+inline void UFConnIPInfo::setTimeToExpire(time_t input) { _timeToExpireDNSInfo = input; }
 inline time_t UFConnIPInfo::getTimedOut() const { return _timedOut; }
 inline void UFConnIPInfo::setTimedOut(time_t t) { _timedOut = t; }
 inline UFMutex* UFConnIPInfo::getMutexToCheckSomeConnection() { return &_someConnectionAvailable; }
@@ -156,5 +142,13 @@ inline void UFConnGroupInfo::setTimeToEx
 inline UFConnIPInfoList& UFConnGroupInfo::getIpInfoList() { return _ipInfoList; }
 
 
+struct UFConnectionPoolCleaner : public UF
+{
+    void run();
+    UFConnectionPoolCleaner (bool registerMe = false);
+    UF* createUF() { return new UFConnectionPoolCleaner(); }
+};
+inline UFConnectionPoolCleaner::UFConnectionPoolCleaner (bool registerMe) { /*if(registerMe) _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);*/ }
+
 
 #endif

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFIO.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFIO.C Fri Aug 13 22:14:31 2010
@@ -1,7 +1,7 @@
-#include <UFIO.H>
-#include <UFConnectionPool.H>
-#include <UFStatSystem.H>
-#include <UFStats.H>
+#include "UFIO.H"
+#include "UFConnectionPool.H"
+#include "UFStatSystem.H"
+#include "UFStats.H"
 #include <netdb.h>
 #include <sys/socket.h> 
 #include <sys/time.h> 
@@ -28,7 +28,7 @@ static int makeSocketNonBlocking(int fd)
     if ((flags = fcntl(fd, F_GETFL, 0)) < 0 ||
         fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
         return -1;
-    
+
     return fd;
 }
 
@@ -42,22 +42,15 @@ void UFIO::reset()
     _sleepInfo = 0;
     _markedActive = false;
     _active = true;
-    if (_readLineBuf)
-        free(_readLineBuf);
-    _readLineBufPos = 0;
-    _readLineBufSize = 0;
 }
 
 UFIO::~UFIO()
 {
     close();
-    if (_readLineBuf)
-        free(_readLineBuf);
 }
 
 UFIO::UFIO(UF* uf, int fd)
 { 
-    _readLineBuf = NULL;
     reset();
     _uf = (uf) ? uf : UFScheduler::getUF(pthread_self());
 
@@ -105,8 +98,6 @@ bool UFIO::isSetup(bool makeNonBlocking)
     return true;
 }
 
-int UFIO::RECV_SOCK_BUF = 57344;
-int UFIO::SEND_SOCK_BUF = 57344;
 int UFIO::setupConnectionToAccept(const char* i_a, 
                                   unsigned short int port, 
                                   unsigned short int backlog,
@@ -163,11 +154,6 @@ int UFIO::setupConnectionToAccept(const 
         return -1;
     }
 
-    //set the recv and send buffers
-    setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*) &UFIO::RECV_SOCK_BUF, sizeof( UFIO::RECV_SOCK_BUF ));
-    setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*) &UFIO::SEND_SOCK_BUF, sizeof( UFIO::SEND_SOCK_BUF ));
-
-
     if (listen(fd, backlog) != 0)
     {
         cerr<<"couldnt setup listen to "<<interface_addr<<" on port "<<port<<" - "<<strerror(errno)<<endl;
@@ -176,13 +162,11 @@ int UFIO::setupConnectionToAccept(const 
         return false;
     }
 
-    cerr << "setup listen socket at " << interface_addr << ':' << port << endl;
     return fd;
 }
 
 void UFIO::accept(UFIOAcceptThreadChooser* ufiotChooser,
                   unsigned short int ufLocation,
-                  unsigned short int port,
                   void* startingArgs,
                   void* stackPtr,
                   unsigned int stackSize)
@@ -213,12 +197,6 @@ void UFIO::accept(UFIOAcceptThreadChoose
         }
     }
 
-    stringstream ss;
-    ss<<"connections.accept.["<<port<<"]";
-    unsigned int connAccepted;
-    UFStatSystem::registerStat(ss.str().c_str(), &connAccepted);
-
-
     int acceptFd = 0;
     struct sockaddr_in cli_addr;
     int sizeof_cli_addr = sizeof(cli_addr);
@@ -260,7 +238,6 @@ void UFIO::accept(UFIOAcceptThreadChoose
             }
 
             //make the new socket non-blocking
-            UFStatSystem::increment(connAccepted, 1);
             if(makeSocketNonBlocking(acceptFd) < 1)
             {
                 cerr<<"couldnt make accepted socket "<<acceptFd<<" non-blocking"<<strerror(errno)<<endl;
@@ -349,124 +326,14 @@ UFIOScheduler* UFIOScheduler::getUFIOS(p
     return tmpUfios;
 }
 
-
-
-ssize_t UFIO::readLine(char* buf, size_t n, char delim)
-{
-    ssize_t res = 0;
-    size_t prev_len = 0;
-    if (!n) return 0;
-    if (_readLineBufPos > 0) {
-        prev_len = ((_readLineBufPos >= n) ? n-1 : _readLineBufPos);
-        char* pos = (char*)memchr(_readLineBuf, delim, prev_len);
-        if (pos) { //found the delim
-            prev_len = pos - _readLineBuf;
-            _readLineBufPos--;
-        }
-        memcpy(buf, _readLineBuf, prev_len);
-        _readLineBufPos -= prev_len;
-        if (pos)
-            memmove(_readLineBuf, _readLineBuf+prev_len+1, _readLineBufPos);
-        else
-            memmove(_readLineBuf, _readLineBuf+prev_len, _readLineBufPos);
-        
-        if (pos || prev_len >= n-1)
-            return prev_len;
-    }
-    _readLineBufPos = 0;
-    while (prev_len < n-1) {
-        res = read(buf+prev_len, n-prev_len, -1);
-        if (res < 0) {
-            return res;
-        } else if (res == 0) {
-            return prev_len;
-        } else {
-            char* pos = (char*)memchr(buf+prev_len, delim, res);
-            if (pos) { //found the delim
-                if (pos < buf+prev_len+res-1) {
-                    _readLineBufPos = buf+prev_len+res-1-pos;
-                    if (_readLineBufSize < _readLineBufPos) {
-                        if (!_readLineBufSize) 
-                            _readLineBufSize = 64;
-                        else 
-                            free(_readLineBuf);
-                        for ( ; _readLineBufSize < _readLineBufPos; _readLineBufSize <<= 1);
-                        _readLineBuf = (char*)malloc(_readLineBufSize);
-                    }
-                    memcpy(_readLineBuf, pos+1, _readLineBufPos);
-                }
-                prev_len = pos-buf;
-                buf[prev_len] = '\0';
-                break;
-            } else {
-                prev_len += res;
-            }
-            buf[prev_len] = '\0';
-        }
-    }
-    return prev_len;
-}
-
-ssize_t UFIO::readLine(std::string &out, size_t n, char delim)
-{
-    if (!n) return 0;
-    char tempbuf[n];
-    ssize_t r = readLine(tempbuf, n, delim);
-    if (r >= 0) 
-    {
-        out.assign(tempbuf, r);
-    }
-    return r;    
-}
-
-static inline TIME_IN_US setupTimeout(TIME_IN_US& timeout)
-{
-    TIME_IN_US now = 0;
-    if (timeout > -1) 
-    {
-        struct timeval now_tv;
-        gettimeofday(&now_tv, NULL);
-        now = timeInUS(now_tv);
-        timeout += now;
-    }
-    return now;
-}
-
-static inline bool calculateLoopedTimeout(TIME_IN_US& now, TIME_IN_US& timeout)
-{
-    if(now)
-    {
-        struct timeval now_tv;
-        gettimeofday(&now_tv, NULL);
-        now = timeInUS(now_tv);
-        if (now >= timeout)
-            return false;
-    }
-    return true;
-}
-
-ssize_t UFIO::read(void *buf, size_t totalBytes, TIME_IN_US timeout)
+ssize_t UFIO::read(void *buf, size_t nbyte, TIME_IN_US timeout)
 {
     UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
-    if (_readLineBufPos > 0) 
-    {
-        size_t prev_len = ((_readLineBufPos > totalBytes) ? totalBytes : _readLineBufPos);
-        memcpy(buf, _readLineBuf, prev_len);
-        _readLineBufPos -= prev_len;
-        memmove(_readLineBuf, _readLineBuf+prev_len, _readLineBufPos);
-        if (prev_len == totalBytes)
-            return prev_len;
-        totalBytes -= prev_len;
-    }
-
-    TIME_IN_US now = setupTimeout(timeout);
-    bool shouldCheckTimeout = false; //the flag ensures that we dont re-do gettimeofday right after the read/write call the first time
-
-    ssize_t n = 0;
+    ssize_t n = 0;;
     while(1)
     {
-        n = ::read(_fd, buf, totalBytes);
+        n = ::read(_fd, buf, nbyte);
         if(n > 0) 
         {
             UFStatSystem::increment(UFStats::bytesRead, n);
@@ -474,37 +341,20 @@ ssize_t UFIO::read(void *buf, size_t tot
         }
         else if(n < 0)
         {
-            if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+            if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
-                if(!timeout) //optimization for the case that the user doesnt want a timeout
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-
+                _errno = 0;
                 _markedActive = false;
-                while(!_markedActive)
+                while(!_markedActive) 
                 {
-                    if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
-                    {
-                        _errno = ETIMEDOUT;
-                        return -1;
-                    }
                     //wait for something to read first
-                    if(!tmpUfios->setupForRead(this, timeout-now))
+                    if(!tmpUfios->setupForRead(this, timeout))
                         return -1;
-                    if(_markedActive) //found some activity on fd, so read
-                        break;
                 }
+                continue;
             }
             else if(errno == EINTR)
-            {
-                if(!calculateLoopedTimeout(now, timeout))
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-            }
+                continue;
             else
             {
                 _errno = errno;
@@ -513,28 +363,28 @@ ssize_t UFIO::read(void *buf, size_t tot
         }
         else if(n == 0)
             break;
-        shouldCheckTimeout = true;
     }
 
+    // Increment stat for bytes written
+    if(n > 0)
+        UFStatSystem::increment(UFStats::bytesRead, n);
+
     return n;
 }
 
-ssize_t UFIO::write(const void *buf, size_t totalBytes, TIME_IN_US timeout)
+ssize_t UFIO::write(const void *buf, size_t nbyte, TIME_IN_US timeout)
 {
     UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
-    TIME_IN_US now = setupTimeout(timeout);
-    bool shouldCheckTimeout = false;
-
-    ssize_t n = 0;
-    size_t amtWritten = 0;
+    ssize_t n = 0;;
+    unsigned int amtWritten = 0;
     while(1)
     {
-        n = ::write(_fd, (char*)buf+amtWritten, totalBytes-amtWritten);
+        n = ::write(_fd, (char*)buf+amtWritten, nbyte-amtWritten);
         if(n > 0)
         {
             amtWritten += n;
-            if(amtWritten == totalBytes)
+            if(amtWritten == nbyte)
             {
                 UFStatSystem::increment(UFStats::bytesWritten, n);
                 return amtWritten;
@@ -544,36 +394,19 @@ ssize_t UFIO::write(const void *buf, siz
         }
         else if(n < 0)
         {
+            _errno = errno;
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
-                if (!timeout) //dont wait to write
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-
                 _markedActive = false;
+                _errno = 0;
                 while(!_markedActive)
                 {
-                    if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
-                    {
-                        _errno = ETIMEDOUT;
+                    if(!tmpUfios->setupForWrite(this, timeout))
                         return -1;
-                    }
-                    if(!tmpUfios->setupForWrite(this, timeout-now))
-                        return -1;
-                    if(_markedActive) //can write, so write
-                        break;
                 }
             }
             else if(errno == EINTR)
-            {
-                if(!calculateLoopedTimeout(now, timeout))
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-            }
+                continue;
             else
             {
                 _errno = errno;
@@ -582,7 +415,6 @@ ssize_t UFIO::write(const void *buf, siz
         }
         else if(n == 0)
             break;
-        shouldCheckTimeout = true;
     }
 
     // Increment stat for bytes written
@@ -593,8 +425,8 @@ ssize_t UFIO::write(const void *buf, siz
 }
 
 bool UFIO::connect(const struct sockaddr *addr, 
-                   socklen_t addrlen, 
-                   TIME_IN_US timeout)
+               int addrlen, 
+               TIME_IN_US timeout)
 {
     if(!isSetup()) //create the socket and make the socket non-blocking
     {
@@ -606,33 +438,17 @@ bool UFIO::connect(const struct sockaddr
     //find the scheduler for this request
     UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
-    TIME_IN_US now = setupTimeout(timeout);
-
-    setsockopt(_fd, SOL_SOCKET, SO_RCVBUF, (char*) &UFIO::RECV_SOCK_BUF, sizeof( UFIO::RECV_SOCK_BUF ));
-    setsockopt(_fd, SOL_SOCKET, SO_SNDBUF, (char*) &UFIO::SEND_SOCK_BUF, sizeof( UFIO::SEND_SOCK_BUF ));
-
     while(::connect(_fd, addr, addrlen) < 0)
     {
         if(errno == EINTR)
-        {
-            if(!calculateLoopedTimeout(now, timeout))
-            {
-                _errno = ETIMEDOUT;
-                return false;
-            }
-        }
+            continue;
         else if(errno == EINPROGRESS || errno == EAGAIN)
         {
-            if(!tmpUfios->setupForConnect(this, timeout-now))
+            if(!tmpUfios->setupForConnect(this, timeout))
             {
                 _errno = errno;
                 return false;
             }
-            if(!calculateLoopedTimeout(now, timeout))
-            {
-                _errno = ETIMEDOUT;
-                return false;
-            }
         }
         else
         {
@@ -644,58 +460,38 @@ bool UFIO::connect(const struct sockaddr
     return true;
 }
 
-int UFIO::sendto(const char *buf, size_t len, const struct sockaddr *to, socklen_t tolen, TIME_IN_US timeout)
+int UFIO::sendto(const char *buf, int len, const struct sockaddr *to, int tolen, TIME_IN_US timeout)
 {
     UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
-    
-    TIME_IN_US now = setupTimeout(timeout);
-    bool shouldCheckTimeout = false;
 
-    ssize_t n = 0;
-    size_t amtWritten = 0;
+    ssize_t n = 0;;
+    unsigned int amtWritten = 0;
     while(1)
     {
         n = ::sendto(_fd, buf+amtWritten, len-amtWritten, 0, to, tolen);
         if(n > 0)
         {
             amtWritten += n;
-            if(amtWritten == len)
+            if((int)amtWritten == len)
                 return amtWritten;
             else
                 continue;
         }
         else if(n < 0)
         {
+            _errno = errno;
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
-                if(!timeout)
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-
                 _markedActive = false;
+                _errno = 0;
                 while(!_markedActive)
                 {
-                    if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
-                    {
-                        _errno = ETIMEDOUT;
+                    if(!tmpUfios->setupForWrite(this, timeout))
                         return -1;
-                    }
-                    if(!tmpUfios->setupForWrite(this, timeout-now))
-                        return -1;
-                    if(_markedActive)
-                        break;
                 }
             }
             else if(errno == EINTR)
-            {
-                if(!calculateLoopedTimeout(now, timeout))
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-            }
+                continue;
             else
             {
                 _errno = errno;
@@ -704,21 +500,17 @@ int UFIO::sendto(const char *buf, size_t
         }
         else if(n == 0)
             break;
-        shouldCheckTimeout = true;
     }
     return n;
 }
 
 int UFIO::sendmsg(const struct msghdr *msg, 
                   int flags,
-                  TIME_IN_US timeout)
+	              TIME_IN_US timeout)
 {
     UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
-    TIME_IN_US now = setupTimeout(timeout);
-    bool shouldCheckTimeout = false;
-
-    ssize_t n = 0;
+    ssize_t n = 0;;
     while(1)
     {
         n = ::sendmsg(_fd, msg, flags); 
@@ -726,36 +518,19 @@ int UFIO::sendmsg(const struct msghdr *m
             continue;
         else if(n < 0)
         {
+            _errno = errno;
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
-                if (!timeout)
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-
                 _markedActive = false;
+                _errno = 0;
                 while(!_markedActive)
                 {
-                    if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
-                    {
-                        _errno = ETIMEDOUT;
-                        return -1;
-                    }
-                    if(!tmpUfios->setupForWrite(this, timeout-now))
+                    if(!tmpUfios->setupForWrite(this, timeout))
                         return -1;
-                    if(_markedActive) //can write, so write
-                        break;
                 }
             }
             else if(errno == EINTR)
-            {
-                if(!calculateLoopedTimeout(now, timeout))
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-            }
+                continue;
             else
             {
                 _errno = errno;
@@ -764,60 +539,39 @@ int UFIO::sendmsg(const struct msghdr *m
         }
         else if(n == 0)
             break;
-        shouldCheckTimeout = true;
     }
     return n;
 }
 
 int UFIO::recvfrom(char *buf, 
-                   size_t len, 
+                   int len, 
                    struct sockaddr *from,
-                   socklen_t *fromlen, 
+		           int *fromlen, 
                    TIME_IN_US timeout)
 {
     UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
-    TIME_IN_US now = setupTimeout(timeout);
-    bool shouldCheckTimeout = false;
-
-    ssize_t n = 0;
+    ssize_t n = 0;;
     while(1)
     {
-        n = ::recvfrom(_fd, buf, len, 0, from, fromlen);
+        n = ::recvfrom(_fd, buf, len, 0, from, (socklen_t *)fromlen);
         if(n > 0)
             return n;
         else if(n < 0)
         {
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
-                if (!timeout)
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-
+                _errno = 0;
                 _markedActive = false;
                 while(!_markedActive)
                 {
-                    if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
-                    {
-                        _errno = ETIMEDOUT;
-                        return -1;
-                    }
-                    if(!tmpUfios->setupForRead(this, timeout-now))
+                    if(!tmpUfios->setupForRead(this, timeout))
                         return -1;
-                    if(_markedActive) //can write, so write
-                        break;
                 }
+                continue;
             }
             else if(errno == EINTR)
-            {
-                if(!calculateLoopedTimeout(now, timeout))
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-            }
+                continue;
             else
             {
                 _errno = errno;
@@ -826,20 +580,16 @@ int UFIO::recvfrom(char *buf, 
         }
         else if(n == 0)
             break;
-        shouldCheckTimeout = true;
     }
     return n;
 }
 
 int UFIO::recvmsg(struct msghdr *msg, 
                   int flags,
-                  TIME_IN_US timeout)
+	              TIME_IN_US timeout)
 {
     UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
 
-    TIME_IN_US now = setupTimeout(timeout);
-    bool shouldCheckTimeout = false;
-
     ssize_t n = 0;
     while(1)
     {
@@ -850,35 +600,17 @@ int UFIO::recvmsg(struct msghdr *msg, 
         {
             if((errno == EAGAIN) || (errno == EWOULDBLOCK))
             {
-                if (!timeout)
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-
+                _errno = 0;
                 _markedActive = false;
                 while(!_markedActive)
                 {
-                    if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
-                    {
-                        _errno = ETIMEDOUT;
+                    if(!tmpUfios->setupForRead(this, timeout))
                         return -1;
-                    }
-                    if(!tmpUfios->setupForRead(this, timeout-now))
-                        return -1;
-                    if(_markedActive) //can write, so write
-                        break;
                 }
                 continue;
             }
             else if(errno == EINTR)
-            {
-                if(!calculateLoopedTimeout(now, timeout))
-                {
-                    _errno = ETIMEDOUT;
-                    return -1;
-                }
-            }
+                continue;
             else
             {
                 _errno = errno;
@@ -887,7 +619,6 @@ int UFIO::recvmsg(struct msghdr *msg, 
         }
         else if(n == 0)
             break;
-        shouldCheckTimeout = true;
     }
     return n;
 }
@@ -1001,7 +732,7 @@ bool EpollUFIOScheduler::addToScheduler(
     }
 
 
-    if(to > 0) //dont consider timeouts less than 1
+    if(to) //add to the sleep queue for the epoll queue TODO
     {
         struct timeval now;
         gettimeofday(&now, 0);
@@ -1015,7 +746,7 @@ bool EpollUFIOScheduler::addToScheduler(
         }
         ufsi->_ufio = ufio;
         ufio->_sleepInfo = ufsi;
-        TIME_IN_US timeToWakeUp = now.tv_sec*1000000+now.tv_usec + to;
+        unsigned long long int timeToWakeUp = now.tv_sec*1000000+now.tv_usec + to;
         if(_earliestWakeUpFromSleep > timeToWakeUp ||
            !_earliestWakeUpFromSleep)
             _earliestWakeUpFromSleep = timeToWakeUp;
@@ -1027,10 +758,9 @@ bool EpollUFIOScheduler::addToScheduler(
     ufio->_markedActive = false;
     if(!wait)
         return true;
-
     ufio->getUF()->block(); //switch context till someone wakes me up
 
-    if(to == -1) //nothing to do w/ no timeout
+    if(!to) //nothing to do w/ no timeout
         return true;
 
     if(ufio->_sleepInfo)
@@ -1039,7 +769,7 @@ bool EpollUFIOScheduler::addToScheduler(
         ufio->_sleepInfo = 0;
         return true;
     }
-    ufio->_errno = ETIMEDOUT;
+    //ufio->_errno = ETIMEDOUT
     return false;
 }
 
@@ -1180,7 +910,7 @@ static void* notifyEpollFunc(void* args)
     if(!args)
         return 0;
 #ifdef PIPE_NOT_EFD
-    if(write(*((int*)args), &eventFDChar, 1) > 0) {}
+    write(*((int*)args), &eventFDChar, 1);
 #else
     eventfd_write(*((int*)args), efdIncrementor); //TODO: deal w/ error case later
 #endif
@@ -1238,10 +968,13 @@ void EpollUFIOScheduler::waitForEvents(T
 
     int nfds;
     struct timeval now;
-    TIME_IN_US timeNow = 0;
+    unsigned long long int timeNow = 0;
     IntUFIOMap::iterator index;
     UFIO* ufio = 0;
     UF* uf = 0;
+    unsigned long long int amtToSleep = timeToWait;
+    int i = 0;
+    _interruptedByEventFd = false;
     UFScheduler* ufs = _uf->getParentScheduler();
     list<UF*> ufsToAddToScheduler;
     if(!ufs)
@@ -1249,11 +982,7 @@ void EpollUFIOScheduler::waitForEvents(T
         cerr<<"epoll scheduler has to be connected to some scheduler"<<endl;
         return;
     }
-
-    TIME_IN_US amtToSleep = timeToWait;
-    int i = 0;
-    _interruptedByEventFd = false;
-    int sleepMS = 0;
+    unsigned long long int amtToSleepFromUFS = 0;
     while(1)
     {
         if(_interruptedByEventFd) //this is so that the last interruption gets handled right away
@@ -1262,16 +991,16 @@ void EpollUFIOScheduler::waitForEvents(T
             _uf->yield();
         }
 
-        if(ufs->getActiveRunningListSize() > 1) //epoll is not the only fiber thats currently active
-            sleepMS = 0; //dont wait on epoll - since there are other ufs waiting to run
-        else
+        if(ufs->getActiveRunningListSize() < 2) //means that epoll is the only fiber thats currently active
         {
-            if(amtToSleep > ufs->getAmtToSleep())
-                amtToSleep = ufs->getAmtToSleep();
-            sleepMS = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
+            if(amtToSleep > (amtToSleepFromUFS = ufs->getAmtToSleep()))
+                amtToSleep = (int)(amtToSleepFromUFS/1000);
+            else
+                amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
         }
-
-        nfds = ::epoll_wait(_epollFd, _epollEventStruct, _maxFds, sleepMS);
+        else
+            amtToSleep = 0; //dont wait on epoll - since there are other ufs waiting to run
+        nfds = ::epoll_wait(_epollFd, _epollEventStruct, _maxFds, amtToSleep);
         if(nfds > 0)
         {
             //for each of the fds that had activity activate them
@@ -1287,9 +1016,10 @@ void EpollUFIOScheduler::waitForEvents(T
                         exit(1);
                     }
                     ufio->_markedActive = true;
-                    if(ufio->_active) //activate the UF only if its being watched by some UF
+                    if(ufio->_active)
+                        //activate the fiber
                         ufs->addFiberToScheduler(uf, 0);
-                    //else must be the case that no one is watching this ufio - such as in the case where the conn. pool holding is onto the conn.
+                    //else must be the case that no one is watching this ufio - such as in the case where the conn. pool holding onto the conn.
                 }
                 else
                 {
@@ -1378,99 +1108,11 @@ void IORunner::run()
     ioRunner->waitForEvents(1000000); //TODO: allow to change the epoll interval later
 }
 
-void UFIO::ufCreateThreadWithIO(pthread_t* tid, UFList* ufsToStartWith)
+void UFIO::ufCreateThreadWithIO(pthread_t* tid, list<UF*>* ufsToStartWith)
 {
+    ufsToStartWith->push_front(new IORunner());
+
     // Add connection pool cleaner
     //ufsToStartWith->push_back(new UFConnectionPoolCleaner); //TODO: figure out how to deal w/ inactive connections
-    ufsToStartWith->push_back(new IORunner()); //we want this to run first and insertions happen in a LIFO manner so we add this uf to the end
     UFScheduler::ufCreateThread(tid, ufsToStartWith);
 }
-
-const unsigned int MAX_IOV = 16;
-ssize_t UFIO::writev(const struct iovec *iov, int iov_size, TIME_IN_US timeout)
-{
-    ssize_t n, retVal;
-    struct iovec* tmp_iov;
-    struct iovec local_iov[MAX_IOV];
-
-    size_t totalBytes = 0;
-    int index;
-    for (index = 0; index < iov_size; index++)
-        totalBytes += iov[index].iov_len;
-
-    retVal = (ssize_t)totalBytes;
-    size_t bytesRemaining = totalBytes;
-    tmp_iov = (struct iovec *) iov;
-    int iov_cnt = iov_size;
-
-    UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
-    _markedActive = false;
-    _errno = 0;
-    while (bytesRemaining > 0)
-    {
-        if (iov_cnt == 1)
-        {
-            if (write(tmp_iov[0].iov_base, bytesRemaining, timeout) != (ssize_t) bytesRemaining)
-                retVal = -1;
-            break;
-        }
-        if ((n = ::writev(_fd, tmp_iov, iov_cnt)) < 0)
-        {
-            if(errno == EINTR)
-                continue;
-            else if(errno == EAGAIN || errno == EWOULDBLOCK)
-            {
-                //TODO: subtract the remaining amt. from the timeout
-                if(!tmpUfios->setupForWrite(this, timeout))
-                {
-                    retVal = -1;
-                    break;
-                }
-            }
-            else
-            {
-                retVal = -1;
-                _errno = errno;
-                break;
-            }
-        }
-        else
-        {
-            if ((size_t) n == bytesRemaining)
-                break;
-            bytesRemaining -= n;
-            n = (ssize_t)(totalBytes - bytesRemaining);
-            for (index = 0; (size_t) n >= iov[index].iov_len; index++)
-                n -= iov[index].iov_len;
-
-            if (tmp_iov == iov)
-            {
-                if ((iov_size - index) <= (int) MAX_IOV)
-                    tmp_iov = local_iov;
-                else
-                {
-                    tmp_iov = (struct iovec*) calloc(1, (iov_size - index) * sizeof(struct iovec));
-                    if (tmp_iov == NULL)
-                    {
-                        _errno = errno;
-                        return -1;
-                    }
-                }
-            }
-
-            tmp_iov[0].iov_base = &(((char *)iov[index].iov_base)[n]);
-            tmp_iov[0].iov_len = iov[index].iov_len - n;
-            index++;
-            for (iov_cnt = 1; index < iov_size; iov_cnt++, index++)
-            {
-                tmp_iov[iov_cnt].iov_base = iov[index].iov_base;
-                tmp_iov[iov_cnt].iov_len = iov[index].iov_len;
-            }
-        }
-    }
-
-    if (tmp_iov != iov && tmp_iov != local_iov)
-        free(tmp_iov);
-
-    return retVal;
-}

Propchange: trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
            ('svn:mergeinfo' removed)

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFPC.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFPC.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFPC.C Fri Aug 13 22:14:31 2010
@@ -1,9 +1,9 @@
-#include <UFPC.H>
+#include "UFPC.H"
 #include <stdio.h>
 
 using namespace std;
 
-size_t UFProducer::produceData(UFDataObject* data, int ufpcCode, bool freeDataOnExit, UF* uf)
+bool UFProducer::produceData(void* data, unsigned int size, int ufpcCode, bool freeDataOnExit, UF* uf)
 {
     if(!uf)
         uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
@@ -12,166 +12,77 @@ size_t UFProducer::produceData(UFDataObj
     //create the UFProducerData structure
     UFProducerData* ufpd = UFProducerData::getObj();
     ufpd->_data = data;
+    ufpd->_size = size;
     ufpd->_freeDataOnExit = freeDataOnExit;
     ufpd->_producerWhichInserted = this;
     ufpd->_ufpcCode = ufpcCode;
     ufpd->_lockToUpdate = _requireLockToUpdateConsumers;
 
-    size_t numConsUpdated = updateConsumers(ufpd, uf);
-    if(numConsUpdated)
-        return numConsUpdated;
-
-    delete ufpd;
-    return 0;
-}
-
-size_t UFProducer::produceData(UFProducerData* ufpd, UF* uf)
-{
-    if(!uf)
-        uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    _uf = uf;
-
-    return updateConsumers(ufpd, uf);
-}
-
-size_t UFJoinableProducer::updateConsumers(UFProducerData* ufpd, UF* uf)
-{
-    size_t consumerCount = getConsumerCount();
-    if(!consumerCount)
-        return 0;
-
     //increase the reference count
-    ufpd->addRef(consumerCount);
+    ufpd->addRef(_producersConsumerSetSize);
 
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    switch(consumerCount)
+    //the optimized case of no locking + there being only one consumer
+    if(!_requireLockToUpdateConsumers &&
+       _producersConsumerSetSize == 1 &&
+       _mostRecentConsumerAdded)
     {
-        case 1: //optimize there being only one consumer
-            {
-                _mostRecentConsumerAdded->_queueOfDataToConsume.push_back(ufpd);
-                if(!_mostRecentConsumerAdded->getNotifyOnExitOnly() || (ufpd->_ufpcCode == 0)) //signal if necessary
-                {
-                    if(_requireLockToUpdateConsumers)
-                        _mostRecentConsumerAdded->_queueOfDataToConsumeLock.signal();
-                    else //wake up from a block
-                        UFScheduler::getUFScheduler()->addFiberToScheduler(_mostRecentConsumerAdded->getUF());
-                }
-                break;
-            }
-        default:
-            {
-                //for each of the consumers add it to their queue
-                for(deque<UFConsumer*>::iterator beg = _producersConsumerSet.begin();
-                    beg != _producersConsumerSet.end(); ++beg)
-                {
-                    UFConsumer* ufc = *beg;
-                    //add to the consumer's queue
-                    if(_requireLockToUpdateConsumers)
-                    {
-                        ufc->_queueOfDataToConsumeLock.lock(uf);
-                        ufc->_queueOfDataToConsume.push_back(ufpd);
-                        if(!ufc->getNotifyOnExitOnly() || (ufpd->_ufpcCode == 0))
-                            ufc->_queueOfDataToConsumeLock.signal();
-                        ufc->_queueOfDataToConsumeLock.unlock(uf);
-                    }
-                    else
-                    {
-                        ufc->_queueOfDataToConsume.push_back(ufpd);
-                        if(!ufc->getNotifyOnExitOnly() || (ufpd->_ufpcCode == 0))
-                            UFScheduler::getUFScheduler()->addFiberToScheduler(ufc->getUF());
-                    }
-                }
-                break;
-            }
+        _mostRecentConsumerAdded->_queueOfDataToConsume.push_back(ufpd);
+        UF* consUF = _mostRecentConsumerAdded->getUF();
+        if(consUF)
+            UFScheduler::getUFScheduler()->addFiberToScheduler(consUF);
+        return true;
     }
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-
-    return consumerCount;
-}
 
-size_t UFNonJoinableProducer::updateConsumers(UFProducerData* ufpd, UF* uf)
-{
-    size_t count = 0;
+    //for each of the consumers add it to their queue
     if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    if(_mostRecentConsumerAdded)
+    for(set<UFConsumer*>::iterator beg = _producersConsumerSet.begin();
+        beg != _producersConsumerSet.end(); ++beg)
     {
-        //increase the reference count
-        ufpd->addRef((count = 1));
-
-        _mostRecentConsumerAdded->_queueOfDataToConsume.push_back(ufpd);
-        if(!_mostRecentConsumerAdded->getNotifyOnExitOnly() || (ufpd->_ufpcCode == 0)) //signal if necessary
+        //add to the consumer's queue
+        if(_requireLockToUpdateConsumers)
         {
-            if(_requireLockToUpdateConsumers)
-                _mostRecentConsumerAdded->_queueOfDataToConsumeLock.signal();
-            else //wake up from a block
-                UFScheduler::getUFScheduler()->addFiberToScheduler(_mostRecentConsumerAdded->getUF());
+            (*beg)->_queueOfDataToConsumeLock.lock(uf);
+            (*beg)->_queueOfDataToConsume.push_back(ufpd);
+            if(!(*beg)->getNotifyOnExitOnly() || (ufpcCode == 0))
+                (*beg)->_queueOfDataToConsumeLock.signal();
+            (*beg)->_queueOfDataToConsumeLock.unlock(uf);
+        }
+        else
+        {
+            (*beg)->_queueOfDataToConsume.push_back(ufpd);
+            UF* consUF = (*beg)->getUF();
+            if(consUF && (!(*beg)->getNotifyOnExitOnly() || (ufpcCode == 0)))
+                UFScheduler::getUFScheduler()->addFiberToScheduler(consUF);
         }
     }
     if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
 
-    return count;
+    return true;
 }
 
-UFConsumer::UFConsumer()
+UFConsumer::UFConsumer(bool notifyOnExitOnly) : _notifyOnExitOnly(notifyOnExitOnly)
 { 
-    reset();
-}
-
-void UFConsumer::reset()
-{
     _currUF = 0; 
     _requireLockToWaitForUpdate = true;
 }
 
-UFJoinableConsumer::UFJoinableConsumer(bool notifyOnExitOnly)
+UFProducerData* UFConsumer::waitForData(UF* uf)
 {
-    _notifyOnExitOnly = notifyOnExitOnly;
-}
-
-void UFConsumer::clearDataToConsume()
-{
-    if(!_queueOfDataToConsume.empty())
-    {
-        deque<UFProducerData*>::iterator beg = _queueOfDataToConsume.begin();
-        for(; beg != _queueOfDataToConsume.end(); ++beg)
-            UFProducerData::releaseObj((*beg));
-    }
-}
-
-UFProducerData* UFConsumer::waitForData(UF* uf, size_t* numRemaining, TIME_IN_US timeToWait)
-{
-    if(numRemaining)
-        *numRemaining = 0;
-
     if(!uf)
         uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+
     _currUF = uf;
 
     if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.lock(uf);
     while(_queueOfDataToConsume.empty())
     {
-        if(_requireLockToWaitForUpdate) 
-        {
-            if(!timeToWait)
-                _queueOfDataToConsumeLock.condWait(uf);
-            else
-            {
-                if(!_queueOfDataToConsumeLock.condTimedWait(uf, timeToWait))
-                {
-                    _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
-                    return 0;
-                }
-            }
-        }
-        else 
-            uf->block(); //wait for the producer to wake up the consumer
+        if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.condWait(uf); //TODO: change to condTimedWait
+        else uf->block(); //wait for the producer to wake up the consumer
     }
 
     //read the first element from the queue
     UFProducerData* result = _queueOfDataToConsume.front();
     _queueOfDataToConsume.pop_front();
-    if(numRemaining)
-        *numRemaining = _queueOfDataToConsume.size();
     if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
 
     return result;
@@ -179,32 +90,22 @@ UFProducerData* UFConsumer::waitForData(
 
 bool UFConsumer::hasData(UF* uf)
 {
-    if(!_requireLockToWaitForUpdate)
-        return !(_queueOfDataToConsume.empty());
-
     if(!uf)
         uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+
     bool result;
-    _queueOfDataToConsumeLock.lock(uf);
+    if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.lock(uf);
     result = !(_queueOfDataToConsume.empty());
-    _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
+    if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
 
     return result;
 }
 
-bool UFJoinableConsumer::joinProducer(UFJoinableProducer* ufp)
+bool UFConsumer::joinProducer(UFProducer* ufp)
 {
-    if(!ufp)
-        return false;
-
-    if(_requireLockToWaitForUpdate)
-    {
-        _consumersProducerSetLock.getSpinLock();
-        _consumersProducerSet.push_back(ufp);
-        _consumersProducerSetLock.releaseSpinLock();
-    }
-    else
-        _consumersProducerSet.push_back(ufp);
+    if(_requireLockToWaitForUpdate) _consumersProducerSetLock.getSpinLock();
+    _consumersProducerSet.insert(ufp);
+    if(_requireLockToWaitForUpdate) _consumersProducerSetLock.releaseSpinLock();
 
     //notify the producer that we're adding this consumer
     if(!ufp->addConsumer(this))
@@ -213,106 +114,42 @@ bool UFJoinableConsumer::joinProducer(UF
     return true;
 }
 
-bool UFNonJoinableConsumer::removeProducer()
+bool UFConsumer::removeProducer(UFProducer* ufp)
 {
-    if(!_ufp || !_ufp->removeConsumer(this))
+    //notifying producer on exit
+    if(!ufp->removeConsumer(this))
         return false;
-    _ufp = 0; //no need to lock - since we cant add any other producer w/ this type of consumer
-    return true;
-}
 
-UFNonJoinableConsumer::UFNonJoinableConsumer(UFNonJoinableProducer* ufp, bool notifyOnExitOnly)
-{
-    _ufp = ufp;
-    _ufp->addConsumer(this);
-    _notifyOnExitOnly = notifyOnExitOnly;
-}
-
-void UFNonJoinableConsumer::resetMe()
-{
-    removeProducer();
-    clearDataToConsume();
+    if(_requireLockToWaitForUpdate) _consumersProducerSetLock.getSpinLock();
+    _consumersProducerSet.erase(ufp);
+    if(_requireLockToWaitForUpdate) _consumersProducerSetLock.releaseSpinLock();
+    return true;
 }
 
-void UFJoinableConsumer::resetMe()
+void UFConsumer::reset()
 {
     //1. notify all the producers on exit
-    for(deque<UFJoinableProducer*>::iterator beg = _consumersProducerSet.begin(); beg != _consumersProducerSet.end(); )
+    for(set<UFProducer*>::iterator beg = _consumersProducerSet.begin(); beg != _consumersProducerSet.end(); )
     {
         removeProducer(*beg);
         beg = _consumersProducerSet.begin();
     }
 
     //2. clear out all the remaining entries in the queue
-    clearDataToConsume();
-}
-
-static bool eraseProducer(UFJoinableProducer* ufp, std::deque<UFJoinableProducer*>& coll)
-{
-    for(deque<UFJoinableProducer*>::iterator beg = coll.begin();
-        beg != coll.end();
-        ++beg)
-    {
-        if(*beg == ufp)
-        {
-            coll.erase(beg);
-            return true;
-        }
-    }
-    return false;
-}
-
-bool UFJoinableConsumer::removeProducer(UFJoinableProducer* ufp)
-{
-    //notifying producer on exit
-    if(!ufp || !ufp->removeConsumer(this))
-        return false;
-
-    if(_requireLockToWaitForUpdate)
+    if(!_queueOfDataToConsume.empty())
     {
-        _consumersProducerSetLock.getSpinLock();
-        //_consumersProducerSet.erase(ufp);
-        eraseProducer(ufp, _consumersProducerSet);
-        _consumersProducerSetLock.releaseSpinLock();
+        list<UFProducerData*>::iterator beg = _queueOfDataToConsume.begin();
+        for(; beg != _queueOfDataToConsume.end(); ++beg)
+            UFProducerData::releaseObj((*beg));
     }
-    else
-        eraseProducer(ufp, _consumersProducerSet);
-        //_consumersProducerSet.erase(ufp);
-
-    return true;
 }
 
 void UFProducer::reset()
 {
-    if(!getConsumerCount()) //dont do anything if there are no consumers remaining
-        return;
-
     //add the EOF indicator
-    if(_sendEOFAtEnd)
-        produceData(0, 0, 0/*exit*/, 0/*freeDataOnExit*/); //notify the consumers that the producer is bailing
+    produceData(0, 0, 0/*exit*/, 0/*freeDataOnExit*/); //notify the consumers that the producer is bailing
 
     //have to wait for all the consumers to acknowledge my death
-    removeAllConsumers();
-}
-
-bool UFJoinableProducer::addConsumer(UFConsumer* ufc)
-{
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    if(!_acceptNewConsumers)
-    {
-        if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-        return false;
-    }
-    _producersConsumerSet.push_back(ufc); //check insertion
-    _mostRecentConsumerAdded = ufc;
-    _mostRecentConsumerAdded->_currUF = uf;
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-    return true;
-}
-
-void UFJoinableProducer::removeAllConsumers()
-{
     UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
     if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
     _acceptNewConsumers = false;
@@ -328,90 +165,5 @@ void UFJoinableProducer::removeAllConsum
     if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
 }
 
-bool UFJoinableProducer::removeConsumer(UFConsumer* ufc)
-{
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    std::deque<UFConsumer*>::iterator beg = _producersConsumerSet.begin();
-    for(;beg != _producersConsumerSet.end(); ++beg) //TODO: optimize later - use set if there are too many consumers
-    {
-        if(*beg == ufc)
-        {
-            _producersConsumerSet.erase(beg);
-            break;
-        }
-    }
-    //_producersConsumerSet.erase(ufc);
-    if(_requireLockToUpdateConsumers)  //notify the uf listening for this producer that another consumer bailed
-        _producersConsumerSetLock.signal();
-    else
-    {
-        if(_uf)
-            UFScheduler::getUFScheduler()->addFiberToScheduler(_uf);
-    }
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-    return true;
-}
-
-bool UFNonJoinableProducer::addConsumer(UFConsumer* ufc)
-{
-    if(!ufc || _mostRecentConsumerAdded)
-        return false;
-
-    if(!_requireLockToUpdateConsumers)
-    {
-        _mostRecentConsumerAdded = ufc;
-        return true;
-    }
-
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    _producersConsumerSetLock.lock(uf);
-    _mostRecentConsumerAdded = ufc;
-    _producersConsumerSetLock.unlock(uf);
-    return true;
-}
-
-bool UFNonJoinableProducer::removeConsumer(UFConsumer* ufc)
-{
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    _acceptNewConsumers = false; //can only remove consumer on exit
-    if(!_mostRecentConsumerAdded)
-    {
-        if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-        return true;
-    }
-
-    _mostRecentConsumerAdded = 0;
-    if(_requireLockToUpdateConsumers)  //notify the uf listening for this producer that another consumer bailed - needed if the producer is waiting for the consumers to die before being able to die on its own
-        _producersConsumerSetLock.signal();
-    else
-    {
-        if(_uf)
-            UFScheduler::getUFScheduler()->addFiberToScheduler(_uf);
-    }
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-
-    return true;
-}
-
-void UFNonJoinableProducer::removeAllConsumers()
-{
-    UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
-    _acceptNewConsumers = false; //can only remove consumer on exit
-    while(_mostRecentConsumerAdded)
-    {
-        if(_requireLockToUpdateConsumers) _producersConsumerSetLock.condTimedWait(uf, 1000000);
-        else 
-        {
-            _uf = uf;
-            _uf->block();
-        }
-    }
-    if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-}
-
 stack<UFProducerData*> UFProducerData::_objList;
 UFMutex UFProducerData::_objListMutex;
-