You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/08/14 00:14:32 UTC
svn commit: r985387 [2/3] - in /trafficserver/traffic/branches/UserFiber:
core/include/ core/src/ protocol/ samples/
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConf.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConf.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConf.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConf.C Fri Aug 13 22:14:31 2010
@@ -1,74 +1,63 @@
-#include <UFConf.H>
-#include <UFIO.H>
+#include "UFConf.H"
+
#include <fstream>
#include <iostream>
#include <sstream>
-#include <errno.h>
-#include <string.h>
-
-#include <fcntl.h>
-#include <sys/inotify.h>
-
-UFConf::UFConf(const string &conf) : conf_file(conf), _parent(NULL)
-{
-
-}
-
-void UFConf::init()
-{
- // Parse default config
- string conf_file_default = conf_file + ".default";
- parse(conf_file_default);
-
- // Parse overrides
- parse(conf_file);
-/*
- string *conf_file_parent = getString("parent");
- if(conf_file_parent != NULL)
- {
- _setParent(getConf(*conf_file_parent));
- }
-*/
-}
/** set string value in conf
* Converts the string value that is passed in to a ConfValue and stores it in the conf hash
*/
-void UFConf::setString(const string &key, const string &type, const string &value)
+void UFConf::setString(const string &type, const string &value)
{
- _set<string>(key, type, value);
+ ConfValueBase *existingValue = get(type);
+ if(existingValue != NULL) {
+ delete existingValue;
+ }
+ ConfValue<string> *sharedString = new ConfValue<string>;
+ sharedString->mElement = value;
+ _data[type] = sharedString;
}
/** set int value in conf
* Converts the int value that is passed in to a ConfValue and stores it in the conf hash
*/
-void UFConf::setInt(const string &key, const string &type, int value)
+void UFConf::setInt(const string &type, int value)
{
- _set<int>(key, type, value);
+ ConfValueBase *existingValue = get(type);
+ if(existingValue != NULL) {
+ delete existingValue;
+ }
+ ConfValue<int> *sharedInt = new ConfValue<int>;
+ sharedInt->mElement = value;
+ _data[type] = sharedInt;
}
/** set bool value in conf
* Converts the bool value that is passed in to a ConfValue and stores it in the conf hash
*/
-void UFConf::setBool(const string &key, const string &type, bool value)
+void UFConf::setBool(const string &type, bool value)
{
- _set<bool>(key, type, value);
+ ConfValueBase *existingValue = get(type);
+ if(existingValue != NULL) {
+ delete existingValue;
+ }
+ ConfValue<bool> *sharedBool = new ConfValue<bool>;
+ sharedBool->mElement = value;
+ _data[type] = sharedBool;
}
/** set double value in conf
* Converts double value that is passed in to a ConfValue and stores it in the conf hash
*/
-void UFConf::setDouble(const string &key, const string &type, double value)
+void UFConf::setDouble(const string &type, double value)
{
- _set<double>(key, type, value);
-}
-
-/** set vector<string> value in conf
- * Converts the vector<string> value that is passed in to a ConfValue and stores it in the conf hash
- */
-void UFConf::setStringVector(const string &key, const string &type, const vector<string> &value)
-{
- _set< vector<string> >(key, type, value);
+ ConfValueBase *existingValue = get(type);
+ if(existingValue != NULL) {
+ delete existingValue;
+ }
+ ConfValue<double> *sharedDouble = new ConfValue<double>;
+ sharedDouble->mElement = value;
+ _data[type] = sharedDouble;
}
/** Get the string value associated with the key
@@ -78,7 +67,13 @@ void UFConf::setStringVector(const strin
*/
string *UFConf::getString(const string &key)
{
- return _get<string>(key);
+ ConfValue<string> *sharedString = (ConfValue<string> *)get(key);
+ if(sharedString != NULL) {
+ return &sharedString->mElement;
+ }
+ if(_parent == NULL)
+ return NULL;
+ return _parent->getString(key);
}
/** Get the int value associated with the key
@@ -88,7 +83,13 @@ string *UFConf::getString(const string &
*/
int *UFConf::getInt(const string &key)
{
- return _get<int>(key);
+ ConfValue<int> *sharedInt = (ConfValue<int> *)get(key);
+ if(sharedInt != NULL) {
+ return &sharedInt->mElement;
+ }
+ if(_parent == NULL)
+ return NULL;
+ return _parent->getInt(key);
}
/** Get the bool value associated the with key
@@ -98,7 +99,13 @@ int *UFConf::getInt(const string &key)
*/
bool *UFConf::getBool(const string &key)
{
- return _get<bool>(key);
+ ConfValue<bool> *sharedBool = (ConfValue<bool> *)get(key);
+ if(sharedBool != NULL) {
+ return &sharedBool->mElement;
+ }
+ if(_parent == NULL)
+ return NULL;
+ return _parent->getBool(key);
}
/** Get the double value associated with the key
@@ -106,19 +113,15 @@ bool *UFConf::getBool(const string &key)
* If key is not found in the local conf, forwards request to parent conf
* If key is not found in either local or parent conf, NULL is returned
*/
-double *UFConf::getDouble(const string &key)
-{
- return _get<double>(key);
-}
-
-/** Get the vector<string> value associated with the key
- * Looks at local conf hash for the key
- * If key is not found in the local conf, forwards request to parent conf
- * If key is not found in either local or parent conf, NULL is returned
- */
-vector<string> *UFConf::getStringVector(const string &key)
+double *UFConf::getDouble(const string &type)
{
- return _get< vector<string> >(key);
+ ConfValue<double> *sharedDouble = (ConfValue<double> *)get(type);
+ if(sharedDouble != NULL) {
+ return &sharedDouble->mElement;
+ }
+ if(_parent == NULL)
+ return NULL;
+ return _parent->getDouble(type);
}
/** get value associated with the key that is passed in
@@ -136,21 +139,9 @@ ConfValueBase *UFConf::get(const string
return _parent->get(key);
}
-/** Set parent config
- * Also caches any config parameters that the child may be interested in
- */
-void UFConf::_setParent(UFConf *parent)
-{
- _parent = parent;
- for(hash_map<string, ConfValueBase *>::iterator it = parent->_data.begin();
- it != parent->_data.end(); it++)
- {
- cache(it->first, it->second);
- }
-}
-
/** Parse config file and store in conf hash
- * calls parseLine for each line in the conf file
+ * Looks for config values of type STRING, INT, DOUBLE and BOOL
+ * Skips over lines beginning with '#'
*/
bool UFConf::parse(const std::string &conf_file)
{
@@ -158,73 +149,50 @@ bool UFConf::parse(const std::string &co
infile.open(conf_file.c_str());
if(!infile.is_open())
return false; // Could not open file
- string line;
- bool ok = true;
- while(getline(infile, line))
- {
- // Ignore lines starting with '#'
- if(line[0] != '#')
- ok &= parseLine(line);
- }
- infile.close();
- afterParse();
- return ok;
-}
-/** Parses config line
- * Looks for config values of type STRING, INT, DOUBLE and BOOL
- * Skips over lines beginning with '#'
- */
-bool UFConf::parseLine(const std::string &line)
-{
+ string line;
istringstream instream;
- instream.str(line); // Use s as source of input.
- string conf_key, conf_key_type;
- if (instream >> conf_key >> conf_key_type)
+ while(getline(infile, line))
{
- // get type from config file, read into corresponding value and store
- string string_value;
- int int_value;
- double double_value;
- bool bool_value;
- if(conf_key_type == "STRING")
+ instream.clear(); // Reset from possible previous errors.
+ instream.str(line); // Use s as source of input.
+ string conf_key, conf_key_type;
+ if (instream >> conf_key >> conf_key_type)
{
- string string_val_temp;
- while(instream >> string_val_temp)
+ // skip lines starting with #
+ if(conf_key[0] == '#')
+ continue;
+
+ // get type from config file, read into corresponding value and store
+ string string_value;
+ int int_value;
+ double double_value;
+ bool bool_value;
+ if(conf_key_type == "STRING")
{
- if(string_value.length())
- string_value += " ";
- string_value += string_val_temp;
+ if(instream >> string_value)
+ setString(conf_key, string_value);
+ }
+ if(conf_key_type == "INT")
+ {
+ if(instream >> int_value)
+ setInt(conf_key, int_value);
+ }
+ if(conf_key_type == "DOUBLE")
+ {
+ if(instream >> double_value)
+ setDouble(conf_key, double_value);
+ }
+ if(conf_key_type == "BOOL")
+ {
+ if(instream >> bool_value)
+ setBool(conf_key, bool_value);
}
- setString(conf_key, conf_key_type, string_value);
- }
- if(conf_key_type == "INT")
- {
- if(instream >> int_value)
- setInt(conf_key, conf_key_type, int_value);
- }
- if(conf_key_type == "DOUBLE")
- {
- if(instream >> double_value)
- setDouble(conf_key, conf_key_type, double_value);
- }
- if(conf_key_type == "BOOL")
- {
- if(instream >> bool_value)
- setBool(conf_key, conf_key_type, bool_value);
}
}
- return true;
-}
-void UFConf::clear()
-{
- for(std::hash_map<std::string, ConfValueBase *>::iterator it = _data.begin(); it != _data.end(); it++)
- {
- if(it->second != NULL)
- delete it->second;
- }
- _data.clear();
+ infile.close();
+ return true;
}
/**
@@ -244,142 +212,7 @@ ostream& operator<<(ostream& output, con
return output;
}
-UFConfManager::UFConfManager() : _notify_fd(-1)
-{
- _notify_fd = inotify_init();
- if(_notify_fd >= 0)
- {
- int flags = ::fcntl(_notify_fd, F_GETFL, 0);
- if(flags != -1)
- ::fcntl(_notify_fd, F_SETFL, flags | O_NONBLOCK);
- }
-}
-
-// Default refresh time is 30 mins
-long UFConfManager::_refreshTime = 30000000;
-long UFConfManager::getRefreshTime()
-{
- return _refreshTime;
-}
-
-void UFConfManager::setRefreshTime(long rtime)
-{
- _refreshTime = rtime;
-}
-
-pthread_key_t UFConfManager::_createThreadKey()
-{
- if(pthread_key_create(&UFConfManager::threadSpecificKey, 0) != 0)
- {
- cerr<<"couldnt create thread specific key "<<strerror(errno)<<endl;
- exit(1);
- }
- return UFConfManager::threadSpecificKey;
-}
-pthread_key_t UFConfManager::threadSpecificKey = UFConfManager::_createThreadKey();
-
-UFConfManager* UFConfManager::getConfManager()
-{
- void * ret = pthread_getspecific(threadSpecificKey);
- return (UFConfManager*) ret;
-}
-
-// FIXME : Random number picked from example code
-const int BUFSZ = 1024;
-
-void UFConfManager::reload()
-{
-#ifdef __linux__
- // Wrap inotify fd in UFIO
- UFIO* ufio = new UFIO(UFScheduler::getUF());
- if(!ufio || !ufio->setFd(_notify_fd, false/*has already been made non-blocking*/))
- {
- cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup conf refresher" << endl;
- return;
- }
-
- // Check for changes from inotify
- while(1)
- {
- char buf[BUFSZ];
- int n = ufio->read(buf, sizeof(buf));
- if (n <= 0)
- {
- continue;
- }
-
- // Go over changes
- // Collect confs that were changed
- vector<UFConf *> files_changed;
- int i = 0;
- while (i < n)
- {
- struct inotify_event *ev;
- ev = (struct inotify_event *) &buf[i];
-
- if (ev && ev->wd && (ev->mask & IN_MODIFY))
- {
- // Check if the file that was modified is in the conf system
- std::hash_map<std::string, UFConf *>::iterator it = _configs.find(_watch_fd_map[ev->wd]);
- if(it == _configs.end())
- continue; // This should never happen
-
- // cerr <<getPrintableTime()<<" "<<getpid()<< "Reloading conf : " << it->first << endl;
-
- // Conf found in system. Reparse.
- it->second->clear();
- it->second->parse(it->first);
-
- files_changed.push_back(it->second);
- }
- i += sizeof(struct inotify_event) + ev->len;
- }
-
-
- // cerr <<getPrintableTime()<<" "<<getpid()<< "file_changed size : " << files_changed.size() << endl;
- for(vector<UFConf *>::iterator it = files_changed.begin(); it != files_changed.end(); it++)
- {
- _reloadChildren(*it);
- }
- }
-#else
- // system does not support inotify.
- // brute force.
- // reparse all configs.
- for(std::hash_map<std::string, UFConf *>::iterator it = _configs.begin(); it != _configs.end(); it++)
- {
- it->second->clear();
- it->second->parse(it->first);
- }
-
- // reload children for every conf object
- for(std::hash_map<std::string, UFConf *>::iterator it = _configs.begin(); it != _configs.end(); it++)
- {
- _reloadChildren(it->second);
- }
-#endif
-}
-
-void UFConfManager::_reloadChildren(UFConf *parent_conf)
-{
- // Get children
- hash_map< string, vector<string> >::iterator child_map_it = _child_map.find(parent_conf->conf_file);
- if(child_map_it == _child_map.end())
- return; // Leaf node
-
- vector<string> &children = child_map_it->second;
- // Go over children and make them pickup changes from parent
- for(vector<string>::iterator vit = children.begin(); vit != children.end(); vit++)
- {
- std::hash_map<std::string, UFConf *>::iterator conf_map_it = _configs.find(*vit);
- if(conf_map_it == _configs.end())
- continue;
- conf_map_it->second->_setParent(parent_conf);
-
- // Recursively reload children
- _reloadChildren(conf_map_it->second);
- }
-}
+std::hash_map<std::string, UFConf *> UFConfManager::_configs;
/** Add new child conf
* Create new conf and set parent to conf object corresponding to the parent_conf that is passed in
@@ -393,24 +226,17 @@ UFConf* UFConfManager::addChildConf(cons
return it->second;
}
- // Create conf
- UFConf *conf_created = addConf(conf_file);
-
// Check if parent config was created
it = _configs.find(parent_conf_file);
if(it == _configs.end())
- {
- cerr << "Warning : Parent config was not created. Not setting parent conf" << endl;
- }
- else
- {
- // Set parent conf
- if(conf_created != NULL)
- {
- conf_created->_setParent(it->second);
- _child_map[parent_conf_file].push_back(conf_file);
- }
- }
+ return NULL; // Parent config was not created
+
+ // Create conf
+ UFConf *conf_created = addConf(conf_file);
+
+ // Set parent conf
+ if(conf_created != NULL)
+ conf_created->setParent(it->second);
return conf_created;
}
@@ -427,46 +253,25 @@ UFConf* UFConfManager::addConf(const str
}
// Create new UFConf
- UFConf *conf = UFConfFactory<UFConf>(conf_file);
-
- // Store in conf map
- _configs[conf_file] = conf;
+ UFConf *conf = new UFConf;
- // inotify watch file for changes
- _addWatch(conf_file);
- return conf;
-}
+ // Parse default config
+ string conf_file_default = conf_file + ".default";
+ conf->parse(conf_file_default);
+
+ // Parse overrides
+ conf->parse(conf_file);
-/** Add new conf
- * Uses the conf that is passed in
- */
-bool UFConfManager::addConf(UFConf *conf, const string &conf_file)
-{
- hash_map<string, UFConf*>::iterator it = _configs.find(conf_file);
- if(it != _configs.end())
+ string *conf_file_parent = conf->getString("parent");
+ if(conf_file_parent != NULL)
{
- // Conf already exists
- return false;
+ conf->setParent(getConf(*conf_file_parent));
}
// Store in conf map
_configs[conf_file] = conf;
- // inotify watch file for changes
- _addWatch(conf_file);
- return true;
-}
-
-/** Add new child conf
- * Add conf object to manager and set parent
- */
-bool UFConfManager::addChildConf(UFConf *child_conf, const string &conf_file, const string &parent_conf_file)
-{
- if(!addConf(child_conf, conf_file))
- return false;
- _child_map[parent_conf_file].push_back(conf_file);
- child_conf->_setParent(getConf(parent_conf_file));
- return true;
+ return conf;
}
/**
@@ -494,19 +299,3 @@ void UFConfManager::dump()
cerr << "=============CONF " << it->first << " ENDS" << endl;
}
}
-
-void UFConfManager::_addWatch(const string &conf_file)
-{
- if(_notify_fd < 0)
- {
- cerr << "UFConfManager::_addWatch could not add watch on " << conf_file << endl;
- return;
- }
-
- int wd = inotify_add_watch(_notify_fd, conf_file.c_str(), IN_MODIFY);
- if(wd < 0)
- cerr << "UFConfManager::_addWatch error adding watch on " << conf_file << endl;
-
- _watch_fd_map[wd] = conf_file;
- // cerr << "UFConfManager::_addWatch watch on " << conf_file << endl;
-}
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.C Fri Aug 13 22:14:31 2010
@@ -1,9 +1,7 @@
-#include <UFConnectionPool.H>
+#include "UFConnectionPool.H"
#include "UFConnectionPoolImpl.H"
-#include <UFStatSystem.H>
-#include <UFStats.H>
-#include <UFIO.H>
+#include "UFIO.H"
#include <stdlib.h>
#include <fstream>
#include <iostream>
@@ -20,7 +18,7 @@
#include <string.h>
#ifdef USE_CARES
-#include <UFAres.H>
+#include "UFAres.H"
#endif
const unsigned short int PERCENT_LOGGING_SAMPLING = 5;
@@ -28,11 +26,11 @@ const unsigned short int PERCENT_LOGGING
using namespace std;
UFConnIPInfo::UFConnIPInfo(const string& ip,
- unsigned int port,
- bool persistent,
- int maxSimultaneousConns,
- TIME_IN_US connectTimeout,
- TIME_IN_US timeToFailOutIPAfterFailureInSecs)
+ unsigned int port,
+ bool persistent,
+ int maxSimultaneousConns,
+ TIME_IN_US connectTimeout,
+ TIME_IN_US timeToFailOutIPAfterFailureInSecs)
{
_ip = ip;
_port = port;
@@ -42,6 +40,7 @@ UFConnIPInfo::UFConnIPInfo(const string&
_connectTimeout = connectTimeout;
_timedOut = 0;
_inProcessCount = 0;
+ _timeToExpireDNSInfo = 0;
memset(&_sin, 0, sizeof(_sin));
_sin.sin_family = AF_INET;
@@ -120,30 +119,24 @@ UFConnGroupInfo* UFConnectionPoolImpl::a
return group;
}
-const unsigned int DEFAULT_TTL = 300;
-bool UFConnectionPoolImpl::createIPInfo(const string& groupName, UFConnGroupInfo* groupInfo, TIME_IN_US connectTimeout)
+bool UFConnectionPoolImpl::createIPInfo(const string& groupName, UFConnGroupInfo* groupInfo)
{
UFConnIPInfoList& ipInfoList = groupInfo->getIpInfoList();
size_t indexOfColon = groupName.find(':');
string hostName = groupName;
unsigned int port = 0;
- string portString = "";
if(indexOfColon != string::npos)
{
hostName = groupName.substr(0, indexOfColon);
- portString = groupName.substr(indexOfColon+1).c_str();
- port = atoi(portString.c_str());
+ port = atoi(groupName.substr(indexOfColon+1).c_str());
}
- else
- return false;
//have to figure out the hosts listed w/
//
int lowestTTL = 0;
UFConnIPInfo* ipInfo = 0;
string ipString;
- string ipStringToInsert;
#ifdef USE_CARES
UFAres ufares;
@@ -157,10 +150,8 @@ bool UFConnectionPoolImpl::createIPInfo(
{
ipString = inet_ntoa(results[i].ipaddr);
#else
- struct hostent hostInfo, *h;
- int hErrno = 0;
- char tmpHostBuf[1024];
- if(gethostbyname_r (hostName.c_str(), &hostInfo, tmpHostBuf, 1024, &h, &hErrno) || hErrno)
+ struct hostent* h = gethostbyname(hostName.c_str());
+ if(!h)
return false;
for(unsigned int i = 0; 1; i++)
@@ -169,14 +160,13 @@ bool UFConnectionPoolImpl::createIPInfo(
break;
ipString = inet_ntoa(*((in_addr*)h->h_addr_list[i]));
#endif
- ipStringToInsert = ipString + ":" + portString;
//check if the ip already exists in the system
- IPInfoStore::iterator index = _ipInfoStore.find(ipStringToInsert);
+ IPInfoStore::iterator index = _ipInfoStore.find(ipString);
if(index == _ipInfoStore.end())
{
- ipInfo = new UFConnIPInfo(ipString, port, true, 0, connectTimeout, 30);
- _ipInfoStore[ipStringToInsert] = ipInfo;
- index = _ipInfoStore.find(ipStringToInsert);
+ ipInfo = new UFConnIPInfo(ipString, port, true, 0, 0, 30);
+ _ipInfoStore[ipString] = ipInfo;
+ index = _ipInfoStore.find(ipString);
}
ipInfo = index->second;
@@ -184,9 +174,9 @@ bool UFConnectionPoolImpl::createIPInfo(
if(lowestTTL > results[i].ttl || !lowestTTL)
lowestTTL = results[i].ttl;
#else
- lowestTTL = DEFAULT_TTL;
+ lowestTTL = 300; //default to 60 secs
#endif
- ipInfoList.push_back(ipStringToInsert);
+ ipInfoList.push_back(ipString);
}
time_t timeToExpireAt = lowestTTL + time(0);
@@ -197,7 +187,7 @@ bool UFConnectionPoolImpl::createIPInfo(
}
//TODO: return a ResultStructure which includes the UFConnIPInfo* along w/ the UFIO* so that the map doesnt have to be looked up on every return back of the structure
-UFIO* UFConnectionPoolImpl::getConnection(const std::string& groupName, bool waitForConnection, TIME_IN_US connectTimeout)
+UFIO* UFConnectionPoolImpl::getConnection(const std::string& groupName, bool waitForConnection)
{
if(!groupName.length())
return 0;
@@ -219,12 +209,12 @@ UFIO* UFConnectionPoolImpl::getConnectio
if(!ipInfoList.empty()) //clear the existing set since the ttl expired
ipInfoList.clear();
- if(!createIPInfo(groupName, groupInfo, connectTimeout))
+ if(!createIPInfo(groupName, groupInfo))
return 0;
}
- set<unsigned int> alreadySeenIPList; //this list will keep track of the ips that we've already seen
+ map<unsigned int,unsigned int> alreadySeenIPList; //this list will keep track of the ips that we've already seen
unsigned int groupIpSize = ipInfoList.size();
while(alreadySeenIPList.size() < groupIpSize) //bail out if we've seen all the ips already
{
@@ -239,7 +229,7 @@ UFIO* UFConnectionPoolImpl::getConnectio
if(ipInfo && !ipInfo->_currentlyAvailableConnections.empty())
{
elementNum = i;
- alreadySeenIPList.insert(elementNum);
+ alreadySeenIPList[elementNum] = 1;
break;
}
}
@@ -250,7 +240,7 @@ UFIO* UFConnectionPoolImpl::getConnectio
elementNum = random() % groupIpSize;
if(alreadySeenIPList.find(elementNum) != alreadySeenIPList.end()) //already seen this IP
continue;
- alreadySeenIPList.insert(elementNum);
+ alreadySeenIPList[elementNum] = 1;
}
UFConnIPInfo* ipInfo = getIPInfo(ipInfoList[elementNum]);
@@ -271,24 +261,8 @@ UFConnIPInfo* UFConnectionPoolImpl::getI
return ((index != _ipInfoStore.end()) ? index->second : 0);
}
-uint32_t statConnFromPoolLocation = 0;
-uint32_t statConnFromNewConnLocation = 0;
-uint32_t statConnMarkedInvalid = 0;
-static bool registerConnPoolStats()
-{
- UFStatSystem::registerStat("connPool.conn_from_pool", &statConnFromPoolLocation);
- UFStatSystem::registerStat("connPool.conn_from_new_connection", &statConnFromNewConnLocation);
- UFStatSystem::registerStat("connPool.conn_marked_invalid", &statConnMarkedInvalid);
-
- return true;
-}
-
UFIO* UFConnIPInfo::getConnection(UFConnIPInfoMap& _ufConnIPInfoMap)
{
- static bool statsRegistered = false;
- if(!statsRegistered)
- statsRegistered = registerConnPoolStats();
-
//2. while the host is timedout - pick another one (put into the list of already seen ips)
time_t currTime = time(0);
_lastUsed = currTime;
@@ -302,28 +276,28 @@ UFIO* UFConnIPInfo::getConnection(UFConn
if(!_currentlyAvailableConnections.empty())
{
//3. pick a connection from the currently available conns
- for(UFIOIntMap::iterator beg = _currentlyAvailableConnections.begin();
- beg != _currentlyAvailableConnections.end(); )
+ UFIOIntMap::iterator beg = _currentlyAvailableConnections.begin();
+ for(; beg != _currentlyAvailableConnections.end();
+ beg = _currentlyAvailableConnections.begin() // we're resetting to the beginning to avoid
+ // the case of two threads ending up getting
+ // the same connection
+ )
{
- returnConn = beg->second;
- _currentlyAvailableConnections.erase(beg++);
+ returnConn = beg->first;
+ _currentlyAvailableConnections.erase(beg);
if(!returnConn)
{
cerr<<time(0)<<" "<<__LINE__<<" "<<"found null conn - removing that from currentlyAvailable"<<endl;
continue;
}
-
if(returnConn->_markedActive) //this indicates that the conn. had some activity while sleeping - thats no good
{
- UFStatSystem::increment(statConnMarkedInvalid, 1);
UFConnIPInfoMap::iterator index = _ufConnIPInfoMap.find(returnConn);
_ufConnIPInfoMap.erase(index);
delete returnConn;
continue;
}
-
- UFStatSystem::increment(statConnFromPoolLocation, 1);
returnConn->_active = true;
_currentlyUsedCount++;
return returnConn;
@@ -350,7 +324,6 @@ UFIO* UFConnIPInfo::getConnection(UFConn
incInProcessCount(-1);
if(returnConn)
{
- UFStatSystem::increment(statConnFromNewConnLocation, 1);
_currentlyUsedCount++;
_ufConnIPInfoMap[returnConn] = this;
}
@@ -412,36 +385,38 @@ void UFConnectionPoolImpl::clearUnusedCo
continue;
}
- //TODO: order _currentlyAvailableConnections by time
//walk the available connection list to see if any conn. hasnt been used for a while
- for(UFIOIntMap::iterator conBeg = ipInfo->_currentlyAvailableConnections.begin();
- conBeg != ipInfo->_currentlyAvailableConnections.end(); )
+ UFIOIntMap::iterator conBeg = ipInfo->_currentlyAvailableConnections.begin();
+ for(; conBeg != ipInfo->_currentlyAvailableConnections.end(); )
{
- UFIO* conn = conBeg->second;
- if(!conn)
+ UFIO* conn = conBeg->first;
+ if(!conn) //TODO
{
ipInfo->_currentlyAvailableConnections.erase(conBeg++);
continue;
}
-
- time_t connLastUsed = conBeg->first;
- if(((int)connLastUsed + (int)lastUsedTimeDiff) > (int)currTime) //the time hasnt expired yet
- break;
- //remove the conn from the _UFConnIPInfoMap list
- UFConnIPInfoMap::iterator index = _ufConnIPInfoMap.find(conn);
- if(index != _ufConnIPInfoMap.end())
- _ufConnIPInfoMap.erase(index);
- delete conn; //delete the connection
- ipInfo->_currentlyAvailableConnections.erase(conBeg++);
+ if((int)(conBeg->second + lastUsedTimeDiff) < (int)currTime /*the conn. has expired*/ ||
+ (conBeg->first->_markedActive)/*somehow this conn. had some update*/)
+ {
+ //remove the conn from the _UFConnIPInfoMap list
+ UFConnIPInfoMap::iterator index = _ufConnIPInfoMap.find(conBeg->first);
+ if(index != _ufConnIPInfoMap.end())
+ _ufConnIPInfoMap.erase(index);
+ delete conBeg->first; //delete the connection
+ ipInfo->_currentlyAvailableConnections.erase(conBeg++);
+ continue;
+ }
+ ++conBeg;
}
-
- //check the last time this ipinfo was ever used - if its > than 600s remove it
- if(ipInfo->_currentlyAvailableConnections.empty() &&
- (ipInfo->getLastUsed() + DEFAULT_LAST_USED_TIME_INTERVAL_FOR_IP < (unsigned int) currTime))
+ if(ipInfo->_currentlyAvailableConnections.empty())
{
- _ipInfoStore.erase(beg++);
- continue;
+ //check the last time this ipinfo was ever used - if its > than 300s remove it
+ if(ipInfo->getLastUsed() + DEFAULT_LAST_USED_TIME_INTERVAL_FOR_IP < (unsigned int) currTime)
+ {
+ _ipInfoStore.erase(beg++);
+ continue;
+ }
}
++beg;
}
@@ -469,7 +444,8 @@ void UFConnectionPoolImpl::releaseConnec
//add to the available list
if(connOk && ipInfo->getPersistent())
{
- ipInfo->_currentlyAvailableConnections.insert(make_pair(time(0), ufIO));
+ time_t currTime = time(0);
+ ipInfo->_currentlyAvailableConnections[ufIO] = currTime;
ufIO->_markedActive = false;
ufIO->_active = false;
}
@@ -481,21 +457,17 @@ void UFConnectionPoolImpl::releaseConnec
}
//signal to all the waiting threads that there might be a connection available
- /*TODO: lock only if we move the conn. pool to support running on multiple threads
UF* this_user_fiber = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
ipInfo->getMutexToCheckSomeConnection()->lock(this_user_fiber);
- */
ipInfo->getMutexToCheckSomeConnection()->broadcast();
- /*
ipInfo->getMutexToCheckSomeConnection()->unlock(this_user_fiber);
- */
}
UFConnectionPoolImpl::~UFConnectionPoolImpl()
{
}
-const unsigned int DEFAULT_COVER_LIST_TIME_IN_US = 240*1000*1000;
+const unsigned int DEFAULT_COVER_LIST_TIME_IN_US = 60*1000*1000;
void UFConnectionPoolCleaner::run()
{
UF* this_uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
@@ -549,9 +521,9 @@ bool UFConnectionPool::addGroup(UFConnGr
return (_impl ? _impl->addGroup(groupInfo) : false);
}
-UFIO* UFConnectionPool::getConnection(const std::string& groupName, bool waitForConnection, TIME_IN_US connectTimeout)
+UFIO* UFConnectionPool::getConnection(const std::string& groupName, bool waitForConnection)
{
- return (_impl ? _impl->getConnection(groupName, waitForConnection, connectTimeout) : 0);
+ return (_impl ? _impl->getConnection(groupName, waitForConnection) : 0);
}
void UFConnectionPool::releaseConnection(UFIO* ufIO, bool connOk)
@@ -577,7 +549,7 @@ void UFConnectionPool::clearUnusedConnec
TIME_IN_US UFConnectionPool::getTimeToTimeoutIPAfterFailure()
{
- return (_impl ? _impl->getTimeToTimeoutIPAfterFailure() : -1);
+ return (_impl ? _impl->getTimeToTimeoutIPAfterFailure() : 0);
}
void UFConnectionPool::setMaxSimulConnsPerHost(int input)
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFConnectionPoolImpl.H Fri Aug 13 22:14:31 2010
@@ -2,37 +2,18 @@
#define UFCONNECTIONPOOLIMPL_H
#include <time.h>
-#include <UF.H>
-#include <UFConnectionPool.H>
-
-#include <ext/hash_map>
-using namespace std;
-namespace std { using namespace __gnu_cxx; }
-struct VoidPtrHash
-{
- size_t operator()(void* p) const
- {
- return reinterpret_cast< uintptr_t >(p);
- }
-};
-
-struct VoidPtrEqual
-{
- bool operator()(void* v1, void* v2) const
- {
- return (v1 == v2);
- }
-};
+#include "UF.H"
+#include "UFConnectionPool.H"
struct UFIO;
struct UFConnIPInfo;
struct UFConnGroupInfo;
-typedef std::map<std::string, UFConnGroupInfo*> GroupIPMap;
-typedef std::map<std::string, UFConnIPInfo*> IPInfoStore;
-typedef std::multimap<unsigned long long int, UFIO*> UFIOIntMap;
-typedef hash_map<void*, UFConnIPInfo*, VoidPtrHash, VoidPtrEqual> UFConnIPInfoMap;
+typedef std::map<std::string, UFConnGroupInfo*> GroupIPMap;
+typedef std::map<std::string, UFConnIPInfo*> IPInfoStore;
+typedef std::map<UFIO*, time_t> UFIOIntMap;
+typedef std::map<UFIO*, UFConnIPInfo*> UFConnIPInfoMap;
const time_t DEFAULT_TIMEOUT_OF_IP_ON_FAILURE = 10;
struct UFConnectionPoolImpl
@@ -45,7 +26,7 @@ struct UFConnectionPoolImpl
bool addGroup(UFConnGroupInfo* stGroupInfo);
void removeGroup(const std::string& name);
- UFIO* getConnection(const std::string& groupName, bool waitForConnection = true, TIME_IN_US connectTimeout = -1);
+ UFIO* getConnection(const std::string& groupName, bool waitForConnection = true);
void releaseConnection(UFIO* ufIO, bool connOk = true);
UFConnGroupInfo* addGroupImplicit(const std::string& groupName);
@@ -64,7 +45,7 @@ protected:
time_t _timeToTimeoutIPAfterFailure; ///How long should we time out an IP if there is a failure
UFConnIPInfo* getIPInfo(const std::string& name);
- bool createIPInfo(const std::string& groupName, UFConnGroupInfo* groupInfo, TIME_IN_US connectTimeout = -1);
+ bool createIPInfo(const std::string& groupName, UFConnGroupInfo* groupInfo);
};
inline void UFConnectionPoolImpl::setMaxSimulConnsPerHost(int input) { _maxSimulConnsPerHost = input; }
inline int UFConnectionPoolImpl::getMaxSimulConnsPerHost() { return _maxSimulConnsPerHost; }
@@ -87,6 +68,8 @@ struct UFConnIPInfo
unsigned int getInProcessCount() const;
bool getPersistent() const;
unsigned int getTimeToFailOutIPAfterFailureInSecs() const;
+ unsigned int getTimeToExpire() const;
+ void setTimeToExpire(time_t input);
TIME_IN_US getConnectTimeout() const;
time_t getTimedOut() const;
void setTimedOut(time_t t);
@@ -108,6 +91,7 @@ protected:
unsigned int _maxSimultaneousConns;
bool _persistent;
+ unsigned int _timeToExpireDNSInfo;
unsigned int _timeToFailOutIPAfterFailureInSecs; ///how many s to try before considering the connect a failure
TIME_IN_US _connectTimeout; ///how many ms to try before considering the connect a failure
time_t _timedOut;
@@ -119,13 +103,15 @@ protected:
};
inline time_t UFConnIPInfo::getLastUsed() const { return _lastUsed; }
inline struct sockaddr_in* UFConnIPInfo::getSin() { return &_sin; }
-inline const std::string& UFConnIPInfo::getIP() const { return _ip; }
+inline const string& UFConnIPInfo::getIP() const { return _ip; }
inline unsigned int UFConnIPInfo::getMaxSimultaneousConns() const { return _maxSimultaneousConns; }
inline unsigned int UFConnIPInfo::getInProcessCount() const { return _inProcessCount; }
inline void UFConnIPInfo::incInProcessCount(int numToIncrement) { _inProcessCount += numToIncrement; }
inline bool UFConnIPInfo::getPersistent() const { return _persistent; }
inline unsigned int UFConnIPInfo::getTimeToFailOutIPAfterFailureInSecs() const { return _timeToFailOutIPAfterFailureInSecs; }
inline TIME_IN_US UFConnIPInfo::getConnectTimeout() const { return _connectTimeout; }
+inline unsigned int UFConnIPInfo::getTimeToExpire() const { return _timeToExpireDNSInfo; }
+inline void UFConnIPInfo::setTimeToExpire(time_t input) { _timeToExpireDNSInfo = input; }
inline time_t UFConnIPInfo::getTimedOut() const { return _timedOut; }
inline void UFConnIPInfo::setTimedOut(time_t t) { _timedOut = t; }
inline UFMutex* UFConnIPInfo::getMutexToCheckSomeConnection() { return &_someConnectionAvailable; }
@@ -156,5 +142,13 @@ inline void UFConnGroupInfo::setTimeToEx
inline UFConnIPInfoList& UFConnGroupInfo::getIpInfoList() { return _ipInfoList; }
+struct UFConnectionPoolCleaner : public UF
+{
+ void run();
+ UFConnectionPoolCleaner (bool registerMe = false);
+ UF* createUF() { return new UFConnectionPoolCleaner(); }
+};
+inline UFConnectionPoolCleaner::UFConnectionPoolCleaner (bool registerMe) { /*if(registerMe) _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);*/ }
+
#endif
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFIO.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFIO.C Fri Aug 13 22:14:31 2010
@@ -1,7 +1,7 @@
-#include <UFIO.H>
-#include <UFConnectionPool.H>
-#include <UFStatSystem.H>
-#include <UFStats.H>
+#include "UFIO.H"
+#include "UFConnectionPool.H"
+#include "UFStatSystem.H"
+#include "UFStats.H"
#include <netdb.h>
#include <sys/socket.h>
#include <sys/time.h>
@@ -28,7 +28,7 @@ static int makeSocketNonBlocking(int fd)
if ((flags = fcntl(fd, F_GETFL, 0)) < 0 ||
fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0)
return -1;
-
+
return fd;
}
@@ -42,22 +42,15 @@ void UFIO::reset()
_sleepInfo = 0;
_markedActive = false;
_active = true;
- if (_readLineBuf)
- free(_readLineBuf);
- _readLineBufPos = 0;
- _readLineBufSize = 0;
}
UFIO::~UFIO()
{
close();
- if (_readLineBuf)
- free(_readLineBuf);
}
UFIO::UFIO(UF* uf, int fd)
{
- _readLineBuf = NULL;
reset();
_uf = (uf) ? uf : UFScheduler::getUF(pthread_self());
@@ -105,8 +98,6 @@ bool UFIO::isSetup(bool makeNonBlocking)
return true;
}
-int UFIO::RECV_SOCK_BUF = 57344;
-int UFIO::SEND_SOCK_BUF = 57344;
int UFIO::setupConnectionToAccept(const char* i_a,
unsigned short int port,
unsigned short int backlog,
@@ -163,11 +154,6 @@ int UFIO::setupConnectionToAccept(const
return -1;
}
- //set the recv and send buffers
- setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*) &UFIO::RECV_SOCK_BUF, sizeof( UFIO::RECV_SOCK_BUF ));
- setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*) &UFIO::SEND_SOCK_BUF, sizeof( UFIO::SEND_SOCK_BUF ));
-
-
if (listen(fd, backlog) != 0)
{
cerr<<"couldnt setup listen to "<<interface_addr<<" on port "<<port<<" - "<<strerror(errno)<<endl;
@@ -176,13 +162,11 @@ int UFIO::setupConnectionToAccept(const
return false;
}
- cerr << "setup listen socket at " << interface_addr << ':' << port << endl;
return fd;
}
void UFIO::accept(UFIOAcceptThreadChooser* ufiotChooser,
unsigned short int ufLocation,
- unsigned short int port,
void* startingArgs,
void* stackPtr,
unsigned int stackSize)
@@ -213,12 +197,6 @@ void UFIO::accept(UFIOAcceptThreadChoose
}
}
- stringstream ss;
- ss<<"connections.accept.["<<port<<"]";
- unsigned int connAccepted;
- UFStatSystem::registerStat(ss.str().c_str(), &connAccepted);
-
-
int acceptFd = 0;
struct sockaddr_in cli_addr;
int sizeof_cli_addr = sizeof(cli_addr);
@@ -260,7 +238,6 @@ void UFIO::accept(UFIOAcceptThreadChoose
}
//make the new socket non-blocking
- UFStatSystem::increment(connAccepted, 1);
if(makeSocketNonBlocking(acceptFd) < 1)
{
cerr<<"couldnt make accepted socket "<<acceptFd<<" non-blocking"<<strerror(errno)<<endl;
@@ -349,124 +326,14 @@ UFIOScheduler* UFIOScheduler::getUFIOS(p
return tmpUfios;
}
-
-
-ssize_t UFIO::readLine(char* buf, size_t n, char delim)
-{
- ssize_t res = 0;
- size_t prev_len = 0;
- if (!n) return 0;
- if (_readLineBufPos > 0) {
- prev_len = ((_readLineBufPos >= n) ? n-1 : _readLineBufPos);
- char* pos = (char*)memchr(_readLineBuf, delim, prev_len);
- if (pos) { //found the delim
- prev_len = pos - _readLineBuf;
- _readLineBufPos--;
- }
- memcpy(buf, _readLineBuf, prev_len);
- _readLineBufPos -= prev_len;
- if (pos)
- memmove(_readLineBuf, _readLineBuf+prev_len+1, _readLineBufPos);
- else
- memmove(_readLineBuf, _readLineBuf+prev_len, _readLineBufPos);
-
- if (pos || prev_len >= n-1)
- return prev_len;
- }
- _readLineBufPos = 0;
- while (prev_len < n-1) {
- res = read(buf+prev_len, n-prev_len, -1);
- if (res < 0) {
- return res;
- } else if (res == 0) {
- return prev_len;
- } else {
- char* pos = (char*)memchr(buf+prev_len, delim, res);
- if (pos) { //found the delim
- if (pos < buf+prev_len+res-1) {
- _readLineBufPos = buf+prev_len+res-1-pos;
- if (_readLineBufSize < _readLineBufPos) {
- if (!_readLineBufSize)
- _readLineBufSize = 64;
- else
- free(_readLineBuf);
- for ( ; _readLineBufSize < _readLineBufPos; _readLineBufSize <<= 1);
- _readLineBuf = (char*)malloc(_readLineBufSize);
- }
- memcpy(_readLineBuf, pos+1, _readLineBufPos);
- }
- prev_len = pos-buf;
- buf[prev_len] = '\0';
- break;
- } else {
- prev_len += res;
- }
- buf[prev_len] = '\0';
- }
- }
- return prev_len;
-}
-
-ssize_t UFIO::readLine(std::string &out, size_t n, char delim)
-{
- if (!n) return 0;
- char tempbuf[n];
- ssize_t r = readLine(tempbuf, n, delim);
- if (r >= 0)
- {
- out.assign(tempbuf, r);
- }
- return r;
-}
-
-static inline TIME_IN_US setupTimeout(TIME_IN_US& timeout)
-{
- TIME_IN_US now = 0;
- if (timeout > -1)
- {
- struct timeval now_tv;
- gettimeofday(&now_tv, NULL);
- now = timeInUS(now_tv);
- timeout += now;
- }
- return now;
-}
-
-static inline bool calculateLoopedTimeout(TIME_IN_US& now, TIME_IN_US& timeout)
-{
- if(now)
- {
- struct timeval now_tv;
- gettimeofday(&now_tv, NULL);
- now = timeInUS(now_tv);
- if (now >= timeout)
- return false;
- }
- return true;
-}
-
-ssize_t UFIO::read(void *buf, size_t totalBytes, TIME_IN_US timeout)
+ssize_t UFIO::read(void *buf, size_t nbyte, TIME_IN_US timeout)
{
UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- if (_readLineBufPos > 0)
- {
- size_t prev_len = ((_readLineBufPos > totalBytes) ? totalBytes : _readLineBufPos);
- memcpy(buf, _readLineBuf, prev_len);
- _readLineBufPos -= prev_len;
- memmove(_readLineBuf, _readLineBuf+prev_len, _readLineBufPos);
- if (prev_len == totalBytes)
- return prev_len;
- totalBytes -= prev_len;
- }
-
- TIME_IN_US now = setupTimeout(timeout);
- bool shouldCheckTimeout = false; //the flag ensures that we dont re-do gettimeofday right after the read/write call the first time
-
- ssize_t n = 0;
+ ssize_t n = 0;;
while(1)
{
- n = ::read(_fd, buf, totalBytes);
+ n = ::read(_fd, buf, nbyte);
if(n > 0)
{
UFStatSystem::increment(UFStats::bytesRead, n);
@@ -474,37 +341,20 @@ ssize_t UFIO::read(void *buf, size_t tot
}
else if(n < 0)
{
- if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
+ if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
- if(!timeout) //optimization for the case that the user doesnt want a timeout
- {
- _errno = ETIMEDOUT;
- return -1;
- }
-
+ _errno = 0;
_markedActive = false;
- while(!_markedActive)
+ while(!_markedActive)
{
- if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
//wait for something to read first
- if(!tmpUfios->setupForRead(this, timeout-now))
+ if(!tmpUfios->setupForRead(this, timeout))
return -1;
- if(_markedActive) //found some activity on fd, so read
- break;
}
+ continue;
}
else if(errno == EINTR)
- {
- if(!calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
- }
+ continue;
else
{
_errno = errno;
@@ -513,28 +363,28 @@ ssize_t UFIO::read(void *buf, size_t tot
}
else if(n == 0)
break;
- shouldCheckTimeout = true;
}
+ // Increment stat for bytes written
+ if(n > 0)
+ UFStatSystem::increment(UFStats::bytesRead, n);
+
return n;
}
-ssize_t UFIO::write(const void *buf, size_t totalBytes, TIME_IN_US timeout)
+ssize_t UFIO::write(const void *buf, size_t nbyte, TIME_IN_US timeout)
{
UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- TIME_IN_US now = setupTimeout(timeout);
- bool shouldCheckTimeout = false;
-
- ssize_t n = 0;
- size_t amtWritten = 0;
+ ssize_t n = 0;;
+ unsigned int amtWritten = 0;
while(1)
{
- n = ::write(_fd, (char*)buf+amtWritten, totalBytes-amtWritten);
+ n = ::write(_fd, (char*)buf+amtWritten, nbyte-amtWritten);
if(n > 0)
{
amtWritten += n;
- if(amtWritten == totalBytes)
+ if(amtWritten == nbyte)
{
UFStatSystem::increment(UFStats::bytesWritten, n);
return amtWritten;
@@ -544,36 +394,19 @@ ssize_t UFIO::write(const void *buf, siz
}
else if(n < 0)
{
+ _errno = errno;
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
- if (!timeout) //dont wait to write
- {
- _errno = ETIMEDOUT;
- return -1;
- }
-
_markedActive = false;
+ _errno = 0;
while(!_markedActive)
{
- if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
+ if(!tmpUfios->setupForWrite(this, timeout))
return -1;
- }
- if(!tmpUfios->setupForWrite(this, timeout-now))
- return -1;
- if(_markedActive) //can write, so write
- break;
}
}
else if(errno == EINTR)
- {
- if(!calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
- }
+ continue;
else
{
_errno = errno;
@@ -582,7 +415,6 @@ ssize_t UFIO::write(const void *buf, siz
}
else if(n == 0)
break;
- shouldCheckTimeout = true;
}
// Increment stat for bytes written
@@ -593,8 +425,8 @@ ssize_t UFIO::write(const void *buf, siz
}
bool UFIO::connect(const struct sockaddr *addr,
- socklen_t addrlen,
- TIME_IN_US timeout)
+ int addrlen,
+ TIME_IN_US timeout)
{
if(!isSetup()) //create the socket and make the socket non-blocking
{
@@ -606,33 +438,17 @@ bool UFIO::connect(const struct sockaddr
//find the scheduler for this request
UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- TIME_IN_US now = setupTimeout(timeout);
-
- setsockopt(_fd, SOL_SOCKET, SO_RCVBUF, (char*) &UFIO::RECV_SOCK_BUF, sizeof( UFIO::RECV_SOCK_BUF ));
- setsockopt(_fd, SOL_SOCKET, SO_SNDBUF, (char*) &UFIO::SEND_SOCK_BUF, sizeof( UFIO::SEND_SOCK_BUF ));
-
while(::connect(_fd, addr, addrlen) < 0)
{
if(errno == EINTR)
- {
- if(!calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return false;
- }
- }
+ continue;
else if(errno == EINPROGRESS || errno == EAGAIN)
{
- if(!tmpUfios->setupForConnect(this, timeout-now))
+ if(!tmpUfios->setupForConnect(this, timeout))
{
_errno = errno;
return false;
}
- if(!calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return false;
- }
}
else
{
@@ -644,58 +460,38 @@ bool UFIO::connect(const struct sockaddr
return true;
}
-int UFIO::sendto(const char *buf, size_t len, const struct sockaddr *to, socklen_t tolen, TIME_IN_US timeout)
+int UFIO::sendto(const char *buf, int len, const struct sockaddr *to, int tolen, TIME_IN_US timeout)
{
UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
-
- TIME_IN_US now = setupTimeout(timeout);
- bool shouldCheckTimeout = false;
- ssize_t n = 0;
- size_t amtWritten = 0;
+ ssize_t n = 0;;
+ unsigned int amtWritten = 0;
while(1)
{
n = ::sendto(_fd, buf+amtWritten, len-amtWritten, 0, to, tolen);
if(n > 0)
{
amtWritten += n;
- if(amtWritten == len)
+ if((int)amtWritten == len)
return amtWritten;
else
continue;
}
else if(n < 0)
{
+ _errno = errno;
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
- if(!timeout)
- {
- _errno = ETIMEDOUT;
- return -1;
- }
-
_markedActive = false;
+ _errno = 0;
while(!_markedActive)
{
- if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
+ if(!tmpUfios->setupForWrite(this, timeout))
return -1;
- }
- if(!tmpUfios->setupForWrite(this, timeout-now))
- return -1;
- if(_markedActive)
- break;
}
}
else if(errno == EINTR)
- {
- if(!calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
- }
+ continue;
else
{
_errno = errno;
@@ -704,21 +500,17 @@ int UFIO::sendto(const char *buf, size_t
}
else if(n == 0)
break;
- shouldCheckTimeout = true;
}
return n;
}
int UFIO::sendmsg(const struct msghdr *msg,
int flags,
- TIME_IN_US timeout)
+ TIME_IN_US timeout)
{
UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- TIME_IN_US now = setupTimeout(timeout);
- bool shouldCheckTimeout = false;
-
- ssize_t n = 0;
+ ssize_t n = 0;;
while(1)
{
n = ::sendmsg(_fd, msg, flags);
@@ -726,36 +518,19 @@ int UFIO::sendmsg(const struct msghdr *m
continue;
else if(n < 0)
{
+ _errno = errno;
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
- if (!timeout)
- {
- _errno = ETIMEDOUT;
- return -1;
- }
-
_markedActive = false;
+ _errno = 0;
while(!_markedActive)
{
- if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
- if(!tmpUfios->setupForWrite(this, timeout-now))
+ if(!tmpUfios->setupForWrite(this, timeout))
return -1;
- if(_markedActive) //can write, so write
- break;
}
}
else if(errno == EINTR)
- {
- if(!calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
- }
+ continue;
else
{
_errno = errno;
@@ -764,60 +539,39 @@ int UFIO::sendmsg(const struct msghdr *m
}
else if(n == 0)
break;
- shouldCheckTimeout = true;
}
return n;
}
int UFIO::recvfrom(char *buf,
- size_t len,
+ int len,
struct sockaddr *from,
- socklen_t *fromlen,
+ int *fromlen,
TIME_IN_US timeout)
{
UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- TIME_IN_US now = setupTimeout(timeout);
- bool shouldCheckTimeout = false;
-
- ssize_t n = 0;
+ ssize_t n = 0;;
while(1)
{
- n = ::recvfrom(_fd, buf, len, 0, from, fromlen);
+ n = ::recvfrom(_fd, buf, len, 0, from, (socklen_t *)fromlen);
if(n > 0)
return n;
else if(n < 0)
{
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
- if (!timeout)
- {
- _errno = ETIMEDOUT;
- return -1;
- }
-
+ _errno = 0;
_markedActive = false;
while(!_markedActive)
{
- if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
- if(!tmpUfios->setupForRead(this, timeout-now))
+ if(!tmpUfios->setupForRead(this, timeout))
return -1;
- if(_markedActive) //can write, so write
- break;
}
+ continue;
}
else if(errno == EINTR)
- {
- if(!calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
- }
+ continue;
else
{
_errno = errno;
@@ -826,20 +580,16 @@ int UFIO::recvfrom(char *buf,
}
else if(n == 0)
break;
- shouldCheckTimeout = true;
}
return n;
}
int UFIO::recvmsg(struct msghdr *msg,
int flags,
- TIME_IN_US timeout)
+ TIME_IN_US timeout)
{
UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- TIME_IN_US now = setupTimeout(timeout);
- bool shouldCheckTimeout = false;
-
ssize_t n = 0;
while(1)
{
@@ -850,35 +600,17 @@ int UFIO::recvmsg(struct msghdr *msg,
{
if((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
- if (!timeout)
- {
- _errno = ETIMEDOUT;
- return -1;
- }
-
+ _errno = 0;
_markedActive = false;
while(!_markedActive)
{
- if(shouldCheckTimeout && !calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
+ if(!tmpUfios->setupForRead(this, timeout))
return -1;
- }
- if(!tmpUfios->setupForRead(this, timeout-now))
- return -1;
- if(_markedActive) //can write, so write
- break;
}
continue;
}
else if(errno == EINTR)
- {
- if(!calculateLoopedTimeout(now, timeout))
- {
- _errno = ETIMEDOUT;
- return -1;
- }
- }
+ continue;
else
{
_errno = errno;
@@ -887,7 +619,6 @@ int UFIO::recvmsg(struct msghdr *msg,
}
else if(n == 0)
break;
- shouldCheckTimeout = true;
}
return n;
}
@@ -1001,7 +732,7 @@ bool EpollUFIOScheduler::addToScheduler(
}
- if(to > 0) //dont consider timeouts less than 1
+ if(to) //add to the sleep queue for the epoll queue TODO
{
struct timeval now;
gettimeofday(&now, 0);
@@ -1015,7 +746,7 @@ bool EpollUFIOScheduler::addToScheduler(
}
ufsi->_ufio = ufio;
ufio->_sleepInfo = ufsi;
- TIME_IN_US timeToWakeUp = now.tv_sec*1000000+now.tv_usec + to;
+ unsigned long long int timeToWakeUp = now.tv_sec*1000000+now.tv_usec + to;
if(_earliestWakeUpFromSleep > timeToWakeUp ||
!_earliestWakeUpFromSleep)
_earliestWakeUpFromSleep = timeToWakeUp;
@@ -1027,10 +758,9 @@ bool EpollUFIOScheduler::addToScheduler(
ufio->_markedActive = false;
if(!wait)
return true;
-
ufio->getUF()->block(); //switch context till someone wakes me up
- if(to == -1) //nothing to do w/ no timeout
+ if(!to) //nothing to do w/ no timeout
return true;
if(ufio->_sleepInfo)
@@ -1039,7 +769,7 @@ bool EpollUFIOScheduler::addToScheduler(
ufio->_sleepInfo = 0;
return true;
}
- ufio->_errno = ETIMEDOUT;
+ //ufio->_errno = ETIMEDOUT
return false;
}
@@ -1180,7 +910,7 @@ static void* notifyEpollFunc(void* args)
if(!args)
return 0;
#ifdef PIPE_NOT_EFD
- if(write(*((int*)args), &eventFDChar, 1) > 0) {}
+ write(*((int*)args), &eventFDChar, 1);
#else
eventfd_write(*((int*)args), efdIncrementor); //TODO: deal w/ error case later
#endif
@@ -1238,10 +968,13 @@ void EpollUFIOScheduler::waitForEvents(T
int nfds;
struct timeval now;
- TIME_IN_US timeNow = 0;
+ unsigned long long int timeNow = 0;
IntUFIOMap::iterator index;
UFIO* ufio = 0;
UF* uf = 0;
+ unsigned long long int amtToSleep = timeToWait;
+ int i = 0;
+ _interruptedByEventFd = false;
UFScheduler* ufs = _uf->getParentScheduler();
list<UF*> ufsToAddToScheduler;
if(!ufs)
@@ -1249,11 +982,7 @@ void EpollUFIOScheduler::waitForEvents(T
cerr<<"epoll scheduler has to be connected to some scheduler"<<endl;
return;
}
-
- TIME_IN_US amtToSleep = timeToWait;
- int i = 0;
- _interruptedByEventFd = false;
- int sleepMS = 0;
+ unsigned long long int amtToSleepFromUFS = 0;
while(1)
{
if(_interruptedByEventFd) //this is so that the last interruption gets handled right away
@@ -1262,16 +991,16 @@ void EpollUFIOScheduler::waitForEvents(T
_uf->yield();
}
- if(ufs->getActiveRunningListSize() > 1) //epoll is not the only fiber thats currently active
- sleepMS = 0; //dont wait on epoll - since there are other ufs waiting to run
- else
+ if(ufs->getActiveRunningListSize() < 2) //means that epoll is the only fiber thats currently active
{
- if(amtToSleep > ufs->getAmtToSleep())
- amtToSleep = ufs->getAmtToSleep();
- sleepMS = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
+ if(amtToSleep > (amtToSleepFromUFS = ufs->getAmtToSleep()))
+ amtToSleep = (int)(amtToSleepFromUFS/1000);
+ else
+ amtToSleep = (amtToSleep > 1000 ? (int)(amtToSleep/1000) : 1); //let epoll sleep for atleast 1ms
}
-
- nfds = ::epoll_wait(_epollFd, _epollEventStruct, _maxFds, sleepMS);
+ else
+ amtToSleep = 0; //dont wait on epoll - since there are other ufs waiting to run
+ nfds = ::epoll_wait(_epollFd, _epollEventStruct, _maxFds, amtToSleep);
if(nfds > 0)
{
//for each of the fds that had activity activate them
@@ -1287,9 +1016,10 @@ void EpollUFIOScheduler::waitForEvents(T
exit(1);
}
ufio->_markedActive = true;
- if(ufio->_active) //activate the UF only if its being watched by some UF
+ if(ufio->_active)
+ //activate the fiber
ufs->addFiberToScheduler(uf, 0);
- //else must be the case that no one is watching this ufio - such as in the case where the conn. pool holding is onto the conn.
+ //else must be the case that no one is watching this ufio - such as in the case where the conn. pool holding onto the conn.
}
else
{
@@ -1378,99 +1108,11 @@ void IORunner::run()
ioRunner->waitForEvents(1000000); //TODO: allow to change the epoll interval later
}
-void UFIO::ufCreateThreadWithIO(pthread_t* tid, UFList* ufsToStartWith)
+void UFIO::ufCreateThreadWithIO(pthread_t* tid, list<UF*>* ufsToStartWith)
{
+ ufsToStartWith->push_front(new IORunner());
+
// Add connection pool cleaner
//ufsToStartWith->push_back(new UFConnectionPoolCleaner); //TODO: figure out how to deal w/ inactive connections
- ufsToStartWith->push_back(new IORunner()); //we want this to run first and insertions happen in a LIFO manner so we add this uf to the end
UFScheduler::ufCreateThread(tid, ufsToStartWith);
}
-
-const unsigned int MAX_IOV = 16;
-ssize_t UFIO::writev(const struct iovec *iov, int iov_size, TIME_IN_US timeout)
-{
- ssize_t n, retVal;
- struct iovec* tmp_iov;
- struct iovec local_iov[MAX_IOV];
-
- size_t totalBytes = 0;
- int index;
- for (index = 0; index < iov_size; index++)
- totalBytes += iov[index].iov_len;
-
- retVal = (ssize_t)totalBytes;
- size_t bytesRemaining = totalBytes;
- tmp_iov = (struct iovec *) iov;
- int iov_cnt = iov_size;
-
- UFIOScheduler* tmpUfios = _ufios ? _ufios : UFIOScheduler::getUFIOS();
- _markedActive = false;
- _errno = 0;
- while (bytesRemaining > 0)
- {
- if (iov_cnt == 1)
- {
- if (write(tmp_iov[0].iov_base, bytesRemaining, timeout) != (ssize_t) bytesRemaining)
- retVal = -1;
- break;
- }
- if ((n = ::writev(_fd, tmp_iov, iov_cnt)) < 0)
- {
- if(errno == EINTR)
- continue;
- else if(errno == EAGAIN || errno == EWOULDBLOCK)
- {
- //TODO: subtract the remaining amt. from the timeout
- if(!tmpUfios->setupForWrite(this, timeout))
- {
- retVal = -1;
- break;
- }
- }
- else
- {
- retVal = -1;
- _errno = errno;
- break;
- }
- }
- else
- {
- if ((size_t) n == bytesRemaining)
- break;
- bytesRemaining -= n;
- n = (ssize_t)(totalBytes - bytesRemaining);
- for (index = 0; (size_t) n >= iov[index].iov_len; index++)
- n -= iov[index].iov_len;
-
- if (tmp_iov == iov)
- {
- if ((iov_size - index) <= (int) MAX_IOV)
- tmp_iov = local_iov;
- else
- {
- tmp_iov = (struct iovec*) calloc(1, (iov_size - index) * sizeof(struct iovec));
- if (tmp_iov == NULL)
- {
- _errno = errno;
- return -1;
- }
- }
- }
-
- tmp_iov[0].iov_base = &(((char *)iov[index].iov_base)[n]);
- tmp_iov[0].iov_len = iov[index].iov_len - n;
- index++;
- for (iov_cnt = 1; index < iov_size; iov_cnt++, index++)
- {
- tmp_iov[iov_cnt].iov_base = iov[index].iov_base;
- tmp_iov[iov_cnt].iov_len = iov[index].iov_len;
- }
- }
- }
-
- if (tmp_iov != iov && tmp_iov != local_iov)
- free(tmp_iov);
-
- return retVal;
-}
Propchange: trafficserver/traffic/branches/UserFiber/core/src/UFIO.C
('svn:mergeinfo' removed)
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFPC.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFPC.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFPC.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFPC.C Fri Aug 13 22:14:31 2010
@@ -1,9 +1,9 @@
-#include <UFPC.H>
+#include "UFPC.H"
#include <stdio.h>
using namespace std;
-size_t UFProducer::produceData(UFDataObject* data, int ufpcCode, bool freeDataOnExit, UF* uf)
+bool UFProducer::produceData(void* data, unsigned int size, int ufpcCode, bool freeDataOnExit, UF* uf)
{
if(!uf)
uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
@@ -12,166 +12,77 @@ size_t UFProducer::produceData(UFDataObj
//create the UFProducerData structure
UFProducerData* ufpd = UFProducerData::getObj();
ufpd->_data = data;
+ ufpd->_size = size;
ufpd->_freeDataOnExit = freeDataOnExit;
ufpd->_producerWhichInserted = this;
ufpd->_ufpcCode = ufpcCode;
ufpd->_lockToUpdate = _requireLockToUpdateConsumers;
- size_t numConsUpdated = updateConsumers(ufpd, uf);
- if(numConsUpdated)
- return numConsUpdated;
-
- delete ufpd;
- return 0;
-}
-
-size_t UFProducer::produceData(UFProducerData* ufpd, UF* uf)
-{
- if(!uf)
- uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
- _uf = uf;
-
- return updateConsumers(ufpd, uf);
-}
-
-size_t UFJoinableProducer::updateConsumers(UFProducerData* ufpd, UF* uf)
-{
- size_t consumerCount = getConsumerCount();
- if(!consumerCount)
- return 0;
-
//increase the reference count
- ufpd->addRef(consumerCount);
+ ufpd->addRef(_producersConsumerSetSize);
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
- switch(consumerCount)
+ //the optimized case of no locking + there being only one consumer
+ if(!_requireLockToUpdateConsumers &&
+ _producersConsumerSetSize == 1 &&
+ _mostRecentConsumerAdded)
{
- case 1: //optimize there being only one consumer
- {
- _mostRecentConsumerAdded->_queueOfDataToConsume.push_back(ufpd);
- if(!_mostRecentConsumerAdded->getNotifyOnExitOnly() || (ufpd->_ufpcCode == 0)) //signal if necessary
- {
- if(_requireLockToUpdateConsumers)
- _mostRecentConsumerAdded->_queueOfDataToConsumeLock.signal();
- else //wake up from a block
- UFScheduler::getUFScheduler()->addFiberToScheduler(_mostRecentConsumerAdded->getUF());
- }
- break;
- }
- default:
- {
- //for each of the consumers add it to their queue
- for(deque<UFConsumer*>::iterator beg = _producersConsumerSet.begin();
- beg != _producersConsumerSet.end(); ++beg)
- {
- UFConsumer* ufc = *beg;
- //add to the consumer's queue
- if(_requireLockToUpdateConsumers)
- {
- ufc->_queueOfDataToConsumeLock.lock(uf);
- ufc->_queueOfDataToConsume.push_back(ufpd);
- if(!ufc->getNotifyOnExitOnly() || (ufpd->_ufpcCode == 0))
- ufc->_queueOfDataToConsumeLock.signal();
- ufc->_queueOfDataToConsumeLock.unlock(uf);
- }
- else
- {
- ufc->_queueOfDataToConsume.push_back(ufpd);
- if(!ufc->getNotifyOnExitOnly() || (ufpd->_ufpcCode == 0))
- UFScheduler::getUFScheduler()->addFiberToScheduler(ufc->getUF());
- }
- }
- break;
- }
+ _mostRecentConsumerAdded->_queueOfDataToConsume.push_back(ufpd);
+ UF* consUF = _mostRecentConsumerAdded->getUF();
+ if(consUF)
+ UFScheduler::getUFScheduler()->addFiberToScheduler(consUF);
+ return true;
}
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-
- return consumerCount;
-}
-size_t UFNonJoinableProducer::updateConsumers(UFProducerData* ufpd, UF* uf)
-{
- size_t count = 0;
+ //for each of the consumers add it to their queue
if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
- if(_mostRecentConsumerAdded)
+ for(set<UFConsumer*>::iterator beg = _producersConsumerSet.begin();
+ beg != _producersConsumerSet.end(); ++beg)
{
- //increase the reference count
- ufpd->addRef((count = 1));
-
- _mostRecentConsumerAdded->_queueOfDataToConsume.push_back(ufpd);
- if(!_mostRecentConsumerAdded->getNotifyOnExitOnly() || (ufpd->_ufpcCode == 0)) //signal if necessary
+ //add to the consumer's queue
+ if(_requireLockToUpdateConsumers)
{
- if(_requireLockToUpdateConsumers)
- _mostRecentConsumerAdded->_queueOfDataToConsumeLock.signal();
- else //wake up from a block
- UFScheduler::getUFScheduler()->addFiberToScheduler(_mostRecentConsumerAdded->getUF());
+ (*beg)->_queueOfDataToConsumeLock.lock(uf);
+ (*beg)->_queueOfDataToConsume.push_back(ufpd);
+ if(!(*beg)->getNotifyOnExitOnly() || (ufpcCode == 0))
+ (*beg)->_queueOfDataToConsumeLock.signal();
+ (*beg)->_queueOfDataToConsumeLock.unlock(uf);
+ }
+ else
+ {
+ (*beg)->_queueOfDataToConsume.push_back(ufpd);
+ UF* consUF = (*beg)->getUF();
+ if(consUF && (!(*beg)->getNotifyOnExitOnly() || (ufpcCode == 0)))
+ UFScheduler::getUFScheduler()->addFiberToScheduler(consUF);
}
}
if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
- return count;
+ return true;
}
-UFConsumer::UFConsumer()
+UFConsumer::UFConsumer(bool notifyOnExitOnly) : _notifyOnExitOnly(notifyOnExitOnly)
{
- reset();
-}
-
-void UFConsumer::reset()
-{
_currUF = 0;
_requireLockToWaitForUpdate = true;
}
-UFJoinableConsumer::UFJoinableConsumer(bool notifyOnExitOnly)
+UFProducerData* UFConsumer::waitForData(UF* uf)
{
- _notifyOnExitOnly = notifyOnExitOnly;
-}
-
-void UFConsumer::clearDataToConsume()
-{
- if(!_queueOfDataToConsume.empty())
- {
- deque<UFProducerData*>::iterator beg = _queueOfDataToConsume.begin();
- for(; beg != _queueOfDataToConsume.end(); ++beg)
- UFProducerData::releaseObj((*beg));
- }
-}
-
-UFProducerData* UFConsumer::waitForData(UF* uf, size_t* numRemaining, TIME_IN_US timeToWait)
-{
- if(numRemaining)
- *numRemaining = 0;
-
if(!uf)
uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+
_currUF = uf;
if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.lock(uf);
while(_queueOfDataToConsume.empty())
{
- if(_requireLockToWaitForUpdate)
- {
- if(!timeToWait)
- _queueOfDataToConsumeLock.condWait(uf);
- else
- {
- if(!_queueOfDataToConsumeLock.condTimedWait(uf, timeToWait))
- {
- _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
- return 0;
- }
- }
- }
- else
- uf->block(); //wait for the producer to wake up the consumer
+ if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.condWait(uf); //TODO: change to condTimedWait
+ else uf->block(); //wait for the producer to wake up the consumer
}
//read the first element from the queue
UFProducerData* result = _queueOfDataToConsume.front();
_queueOfDataToConsume.pop_front();
- if(numRemaining)
- *numRemaining = _queueOfDataToConsume.size();
if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
return result;
@@ -179,32 +90,22 @@ UFProducerData* UFConsumer::waitForData(
bool UFConsumer::hasData(UF* uf)
{
- if(!_requireLockToWaitForUpdate)
- return !(_queueOfDataToConsume.empty());
-
if(!uf)
uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
+
bool result;
- _queueOfDataToConsumeLock.lock(uf);
+ if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.lock(uf);
result = !(_queueOfDataToConsume.empty());
- _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
+ if(_requireLockToWaitForUpdate) _queueOfDataToConsumeLock.unlock(uf); //release the lock gotten earlier
return result;
}
-bool UFJoinableConsumer::joinProducer(UFJoinableProducer* ufp)
+bool UFConsumer::joinProducer(UFProducer* ufp)
{
- if(!ufp)
- return false;
-
- if(_requireLockToWaitForUpdate)
- {
- _consumersProducerSetLock.getSpinLock();
- _consumersProducerSet.push_back(ufp);
- _consumersProducerSetLock.releaseSpinLock();
- }
- else
- _consumersProducerSet.push_back(ufp);
+ if(_requireLockToWaitForUpdate) _consumersProducerSetLock.getSpinLock();
+ _consumersProducerSet.insert(ufp);
+ if(_requireLockToWaitForUpdate) _consumersProducerSetLock.releaseSpinLock();
//notify the producer that we're adding this consumer
if(!ufp->addConsumer(this))
@@ -213,106 +114,42 @@ bool UFJoinableConsumer::joinProducer(UF
return true;
}
-bool UFNonJoinableConsumer::removeProducer()
+bool UFConsumer::removeProducer(UFProducer* ufp)
{
- if(!_ufp || !_ufp->removeConsumer(this))
+ //notifying producer on exit
+ if(!ufp->removeConsumer(this))
return false;
- _ufp = 0; //no need to lock - since we cant add any other producer w/ this type of consumer
- return true;
-}
-UFNonJoinableConsumer::UFNonJoinableConsumer(UFNonJoinableProducer* ufp, bool notifyOnExitOnly)
-{
- _ufp = ufp;
- _ufp->addConsumer(this);
- _notifyOnExitOnly = notifyOnExitOnly;
-}
-
-void UFNonJoinableConsumer::resetMe()
-{
- removeProducer();
- clearDataToConsume();
+ if(_requireLockToWaitForUpdate) _consumersProducerSetLock.getSpinLock();
+ _consumersProducerSet.erase(ufp);
+ if(_requireLockToWaitForUpdate) _consumersProducerSetLock.releaseSpinLock();
+ return true;
}
-void UFJoinableConsumer::resetMe()
+void UFConsumer::reset()
{
//1. notify all the producers on exit
- for(deque<UFJoinableProducer*>::iterator beg = _consumersProducerSet.begin(); beg != _consumersProducerSet.end(); )
+ for(set<UFProducer*>::iterator beg = _consumersProducerSet.begin(); beg != _consumersProducerSet.end(); )
{
removeProducer(*beg);
beg = _consumersProducerSet.begin();
}
//2. clear out all the remaining entries in the queue
- clearDataToConsume();
-}
-
-static bool eraseProducer(UFJoinableProducer* ufp, std::deque<UFJoinableProducer*>& coll)
-{
- for(deque<UFJoinableProducer*>::iterator beg = coll.begin();
- beg != coll.end();
- ++beg)
- {
- if(*beg == ufp)
- {
- coll.erase(beg);
- return true;
- }
- }
- return false;
-}
-
-bool UFJoinableConsumer::removeProducer(UFJoinableProducer* ufp)
-{
- //notifying producer on exit
- if(!ufp || !ufp->removeConsumer(this))
- return false;
-
- if(_requireLockToWaitForUpdate)
+ if(!_queueOfDataToConsume.empty())
{
- _consumersProducerSetLock.getSpinLock();
- //_consumersProducerSet.erase(ufp);
- eraseProducer(ufp, _consumersProducerSet);
- _consumersProducerSetLock.releaseSpinLock();
+ list<UFProducerData*>::iterator beg = _queueOfDataToConsume.begin();
+ for(; beg != _queueOfDataToConsume.end(); ++beg)
+ UFProducerData::releaseObj((*beg));
}
- else
- eraseProducer(ufp, _consumersProducerSet);
- //_consumersProducerSet.erase(ufp);
-
- return true;
}
void UFProducer::reset()
{
- if(!getConsumerCount()) //dont do anything if there are no consumers remaining
- return;
-
//add the EOF indicator
- if(_sendEOFAtEnd)
- produceData(0, 0, 0/*exit*/, 0/*freeDataOnExit*/); //notify the consumers that the producer is bailing
+ produceData(0, 0, 0/*exit*/, 0/*freeDataOnExit*/); //notify the consumers that the producer is bailing
//have to wait for all the consumers to acknowledge my death
- removeAllConsumers();
-}
-
-bool UFJoinableProducer::addConsumer(UFConsumer* ufc)
-{
- UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
- if(!_acceptNewConsumers)
- {
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
- return false;
- }
- _producersConsumerSet.push_back(ufc); //check insertion
- _mostRecentConsumerAdded = ufc;
- _mostRecentConsumerAdded->_currUF = uf;
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
- return true;
-}
-
-void UFJoinableProducer::removeAllConsumers()
-{
UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
_acceptNewConsumers = false;
@@ -328,90 +165,5 @@ void UFJoinableProducer::removeAllConsum
if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
}
-bool UFJoinableProducer::removeConsumer(UFConsumer* ufc)
-{
- UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
- std::deque<UFConsumer*>::iterator beg = _producersConsumerSet.begin();
- for(;beg != _producersConsumerSet.end(); ++beg) //TODO: optimize later - use set if there are too many consumers
- {
- if(*beg == ufc)
- {
- _producersConsumerSet.erase(beg);
- break;
- }
- }
- //_producersConsumerSet.erase(ufc);
- if(_requireLockToUpdateConsumers) //notify the uf listening for this producer that another consumer bailed
- _producersConsumerSetLock.signal();
- else
- {
- if(_uf)
- UFScheduler::getUFScheduler()->addFiberToScheduler(_uf);
- }
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
- return true;
-}
-
-bool UFNonJoinableProducer::addConsumer(UFConsumer* ufc)
-{
- if(!ufc || _mostRecentConsumerAdded)
- return false;
-
- if(!_requireLockToUpdateConsumers)
- {
- _mostRecentConsumerAdded = ufc;
- return true;
- }
-
- UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
- _producersConsumerSetLock.lock(uf);
- _mostRecentConsumerAdded = ufc;
- _producersConsumerSetLock.unlock(uf);
- return true;
-}
-
-bool UFNonJoinableProducer::removeConsumer(UFConsumer* ufc)
-{
- UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
- _acceptNewConsumers = false; //can only remove consumer on exit
- if(!_mostRecentConsumerAdded)
- {
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
- return true;
- }
-
- _mostRecentConsumerAdded = 0;
- if(_requireLockToUpdateConsumers) //notify the uf listening for this producer that another consumer bailed - needed if the producer is waiting for the consumers to die before being able to die on its own
- _producersConsumerSetLock.signal();
- else
- {
- if(_uf)
- UFScheduler::getUFScheduler()->addFiberToScheduler(_uf);
- }
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-
- return true;
-}
-
-void UFNonJoinableProducer::removeAllConsumers()
-{
- UF* uf = UFScheduler::getUFScheduler()->getRunningFiberOnThisThread();
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.lock(uf);
- _acceptNewConsumers = false; //can only remove consumer on exit
- while(_mostRecentConsumerAdded)
- {
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.condTimedWait(uf, 1000000);
- else
- {
- _uf = uf;
- _uf->block();
- }
- }
- if(_requireLockToUpdateConsumers) _producersConsumerSetLock.unlock(uf);
-}
-
stack<UFProducerData*> UFProducerData::_objList;
UFMutex UFProducerData::_objListMutex;
-