You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/08/13 21:43:10 UTC
svn commit: r985334 [3/3] - in /trafficserver/traffic/branches/UserFiber:
core/include/ core/src/ samples/
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFServer.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFServer.C Fri Aug 13 19:43:09 2010
@@ -1,13 +1,16 @@
#include <iostream>
#include <errno.h>
#include <string.h>
-#include "UFServer.H"
-#include "UFServer.H"
-
#include <sys/types.h>
#include <sys/wait.h>
-#include "UFStatSystem.H"
-#include "UFStats.H"
+#include <deque>
+#include <list>
+
+#include <UF.H>
+#include <UFStatSystem.H>
+#include <UFStats.H>
+#include <UFServer.H>
+#include <UFConf.H>
using namespace std;
@@ -15,31 +18,15 @@ using namespace std;
//TODO: create monitoring port later
//
//
-static string getPrintableTime()
-{
- char asctimeDate[32];
- asctimeDate[0] = '\0';
- time_t now = time(0);
- asctime_r(localtime(&now), asctimeDate);
-
- string response = asctimeDate;
- size_t loc = response.find('\n');
- if(loc != string::npos)
- response.replace(loc, 1, "");
- return response;
-}
-
void UFServer::reset()
{
_addressToBindTo = "0";
- _listenFd = -1;
- _port = 0;
+ _listenSockets.clear();
_creationTime = 0;
MAX_THREADS_ALLOWED = 8;
MAX_PROCESSES_ALLOWED = 1;
MAX_ACCEPT_THREADS_ALLOWED = 1;
- UF_STACK_SIZE = 8192;
_threadChooser = 0;
}
@@ -57,9 +44,13 @@ struct NewConnUF : public UF
return;
UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
- ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
+
// increment connections handled stat
UFStatSystem::increment(UFStats::connectionsHandled);
+ // Keep track of current connections
+ UFStatSystem::increment(UFStats::currentConnections);
+ ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
+ UFStatSystem::increment(UFStats::currentConnections, -1);
//clear the client connection
delete fiberStartingArgs->ufio;
@@ -89,21 +80,22 @@ struct AcceptRunner : public UF
//add the scheduler for this
UFIO* ufio = new UFIO(UFScheduler::getUF());
- int fd = ufserver->getListenFd();
- if(fd == -1)
- fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), ufserver->getPort() /*, deal w/ backlog*/);
- if(fd < 0)
+ if (socket.fd == -1)
+ {
+ socket.fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), socket.port /*, deal w/ backlog*/);
+ }
+ if (socket.fd < 0)
{
cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup listen socket"<<endl;
exit(1);
}
- if(!ufio || !ufio->setFd(fd, false/*has already been made non-blocking*/))
+ if(!ufio || !ufio->setFd(socket.fd, false/*has already been made non-blocking*/))
{
cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup accept thread"<<endl;
return;
}
- ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, ufserver, 0, 0);
+ ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, socket.port, ufserver, 0, 0);
}
AcceptRunner(bool registerMe = false)
{
@@ -113,10 +105,39 @@ struct AcceptRunner : public UF
UF* createUF() { return new AcceptRunner(); }
static AcceptRunner* _self;
static int _myLoc;
+ UFServer::ListenSocket socket;
};
int AcceptRunner::_myLoc = -1;
AcceptRunner* AcceptRunner::_self = new AcceptRunner(true);
+struct PerThreadInitializer : public UF
+{
+ void run()
+ {
+ if(!_startingArgs)
+ return;
+// Add conf manager for thread
+ UFConfManager *confManager = new UFConfManager;
+ int ret = pthread_setspecific(UFConfManager::threadSpecificKey, confManager);
+ cerr << getpid() << ":::Adding thread specific UFConfManager key " << UFConfManager::threadSpecificKey << " " << confManager << " " << ret << ", tid : " << pthread_self() << endl;
+
+ UFServer *_server = (UFServer *)_startingArgs;
+ _server->postThreadCreation();
+ }
+
+ UF* createUF() { return new PerThreadInitializer(); }
+
+ PerThreadInitializer(bool registerMe = false)
+ {
+ if(registerMe)
+ _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+ }
+ static PerThreadInitializer* _self;
+ static int _myLoc;
+};
+int PerThreadInitializer::_myLoc = -1;
+PerThreadInitializer* PerThreadInitializer::_self = new PerThreadInitializer(true);
+
void UFServer::startThreads()
{
preThreadCreation();
@@ -129,7 +150,14 @@ void UFServer::startThreads()
//start the IO threads
for(; i<MAX_THREADS_ALLOWED; i++)
{
- UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>());
+ list<UF*>* ufsToAdd = new list<UF*>();
+
+ PerThreadInitializer *pti = new PerThreadInitializer;
+ pti->_startingArgs = this;
+ ufsToAdd->push_back(pti);
+
+ UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
+
cerr<<getPrintableTime()<<" "<<getpid()<<": created thread (with I/O) - "<<thread[i]<<endl;
usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to cond. var later
//add the io threads to the thread chooser
@@ -152,10 +180,19 @@ void UFServer::startThreads()
//start the accept thread
for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
{
- AcceptRunner* ar = new AcceptRunner();
- ar->_startingArgs = this;
list<UF*>* ufsToAdd = new list<UF*>();
- ufsToAdd->push_back(ar);
+ for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end(); ++iter)
+ {
+ AcceptRunner* ar = new AcceptRunner();
+ ar->_startingArgs = this;
+ ar->socket = *iter;
+ ufsToAdd->push_back(ar);
+ }
+
+ PerThreadInitializer *pti = new PerThreadInitializer();
+ pti->_startingArgs = this;
+ ufsToAdd->push_back(pti);
+
UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
usleep(5000); //TODO: let the thread finish initializing
addThread("ACCEPT", 0, thread[i]);
@@ -178,14 +215,17 @@ void UFServer::run()
if(!_threadChooser)
_threadChooser = new UFServerThreadChooser();
- //bind to the socket (before the fork
- _listenFd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), _port); //TODO:set the backlog
- if(_listenFd < 0)
+ for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end(); ++iter)
{
- cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl;
- exit(1);
+ //bind to the socket (before the fork)
+ iter->fd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), iter->port); //TODO:set the backlog
+ if(iter->fd < 0)
+ {
+ cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl;
+ exit(1);
+ }
}
-
+
if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes (or to only run in threaded mode)
{
preThreadRun();
@@ -214,6 +254,7 @@ void UFServer::run()
postForkPreRun();
preThreadRun();
startThreads();
+ postThreadRun();
exit(0);
}
cerr<<getPrintableTime()<<" "<<getpid()<<": (P): started child process: "<<pid<<endl;
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C Fri Aug 13 19:43:09 2010
@@ -1,15 +1,16 @@
#include <string.h>
#include <stdio.h>
-#include "UFStatSystem.H"
-#include "UF.H"
-#include "UFIO.H"
-#include "UFServer.H"
+#include <UFStatSystem.H>
+#include <UF.H>
+#include <UFIO.H>
+#include <UFServer.H>
#include <iostream>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
+using namespace std;
UFServer *UFStatSystem::server;
std::map<std::string, uint32_t> UFStatSystem::stat_name_to_num;
@@ -126,9 +127,8 @@ bool UFStatSystem::get_current(uint32_t
bool UFStatSystem::get_current(const char *stat_name, long long *stat_val)
{
uint32_t stat_num;
- if(!getStatNum(stat_name, stat_num)) {
+ if(!getStatNum(stat_name, stat_num))
return false;
- }
return get_current(stat_num, stat_val);
}
@@ -349,8 +349,9 @@ void StatCommandProcessing::run()
char readbuf[1024];
std::string readData;
- while(1) {
- if((readbytes = ufio->read(readbuf, 1023, 0)) <= 0)
+ while(1)
+ {
+ if((readbytes = ufio->read(readbuf, 1023, 60000000)) <= 0)
break;
readData.append(readbuf, readbytes);
@@ -358,25 +359,26 @@ void StatCommandProcessing::run()
if(readData.find("\r\n") == string::npos)
continue;
- if(readData.find("stats_current") != string::npos) {
+ if(readData.find("stats_current") != string::npos)
+ {
// Force a collect before printing out the stats
UFStatSystem::collect();
std::stringstream printbuf;
UFStatCollector::printStats(printbuf);
- if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
//failed write, break to close connection
break;
- }
}
- else if (readData.find("stats") != string::npos) {
+ else if (readData.find("stats") != string::npos)
+ {
std::stringstream printbuf;
UFStatCollector::printStats(printbuf);
- if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
//failed write, break to close connection
break;
- }
}
- else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos) {
+ else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos)
+ {
std::vector<std::string> stats;
char stat_name[MAX_STAT_NAME_LENGTH];
bzero(stat_name, MAX_STAT_NAME_LENGTH);
@@ -387,49 +389,49 @@ void StatCommandProcessing::run()
bool get_current = false;
if(readData.find("stat ") != string::npos)
start += strlen("stat ");
- else {
+ else
+ {
start += strlen("stat_current ");
get_current = true;
}
- while(sscanf(start, "%s%n", stat_name, &next) == 1) {
+ while(sscanf(start, "%s%n", stat_name, &next) == 1)
+ {
// Prefix support
char *prefix_end = strchr(start, '*');
- if(prefix_end != NULL) {
+ if(prefix_end != NULL)
+ {
std::string prefix;
prefix.assign(start, prefix_end-start);
// Get all stats with the prefix
UFStatCollector::getStatsWithPrefix(prefix, stats);
}
- else {
+ else
stats.push_back(stat_name);
- }
bzero(stat_name, MAX_STAT_NAME_LENGTH);
start+=next;
}
std::stringstream printbuf;
UFStatCollector::printStats(stats, printbuf, get_current);
- if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
//failed write, break to close connection
break;
- }
}
- else if (readData.find("help") != string::npos) {
- if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1) {
+ else if (readData.find("help") != string::npos)
+ {
+ if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)
//failed write, break to close connection
break;
- }
}
- else if (readData.find("quit") != string::npos) {
+ else if (readData.find("quit") != string::npos)
break;
- }
- else {
+ else
+ {
if ((ufio->write(cmdUnrec, sizeof(cmdUnrec)-1) == -1) ||
- (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)) {
+ (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1))
//failed write, break to close connection
break;
- }
}
readData.clear();
} // END while loop
@@ -444,12 +446,24 @@ int StatCommandListenerRun::_myLoc = -1;
StatCommandListenerRun* StatCommandListenerRun::_self = new StatCommandListenerRun(true);
void StatCommandListenerRun::run()
{
- int fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
- if(fd < 0)
+ int fd;
+ unsigned int counter = 0;
+ do
{
- cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
- return;
- }
+ UFStatCollector::_statCommandPort += counter;
+ fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
+ if(fd < 0)
+ {
+ cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
+ if(counter++ == 50) //try upto 50 times
+ return;
+ continue;
+ }
+ else
+ break;
+ } while(1);
+ cerr<<"setup stat command port at "<<UFStatCollector::_statCommandPort;
+
UFIO* ufio = new UFIO(UFScheduler::getUF());
if(!ufio)
@@ -461,7 +475,7 @@ void StatCommandListenerRun::run()
ufio->setFd(fd, false);
StatThreadChooser ufiotChooser;
- ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, 0, 0, 0);
+ ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, UFStatCollector::_statCommandPort, 0, 0);
}
void* UFStatCollector::scheduler(void *args)
@@ -469,70 +483,66 @@ void* UFStatCollector::scheduler(void *a
if(!args)
return 0;
+ cerr<<getPrintableTime()<<" "<<getpid()<<": created stats thread (with I/O) - "<<pthread_self()<<endl;
// add jobs to scheduler
UFScheduler ufs;
+ //insertion has to be done in a LIFO (stack) manner
// stat collector
ufs.addFiberToScheduler(new CollectorRunner());
-
- // set thread for stat command listener to run on
- StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
-
+ // stat command listener port
+ ufs.addFiberToScheduler(new StatCommandListenerRun());
+ ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
// io scheduler
ufs.addFiberToScheduler(new IORunner());
- // stat command listener
- ufs.addFiberToScheduler(new StatCommandListenerRun());
- ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
+ // set thread for stat command listener to run on
+ StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
+
ufs.runScheduler();
return 0;
}
//----------------------------------------------------------
-void UFStatCollector::printStats(std::stringstream &printbuf) {
- printbuf << "Cache stats: \n"
+void UFStatCollector::printStats(std::stringstream &printbuf)
+{
+ printbuf << "Cache stats: \n"
"-----------------------------------------------------------------------------\n";
-
- printbuf << "TIME " << _startTime <<"\n";
+ printbuf << "TIME " << _startTime <<"\n";
- UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
- UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
- statsMutex.lock(running_user_fiber);
+ UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+ statsMutex.lock(running_user_fiber);
- for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
- it != UFStatSystem::global_stats.end(); it++) {
- if(it->value != 0 ) {
- printbuf << "STAT " << it->name << " " << it->value << "\n";
- }
- }
- statsMutex.unlock(running_user_fiber);
+ for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
+ it != UFStatSystem::global_stats.end(); it++)
+ printbuf << "STAT " << it->name << " " << it->value << "\n";
+ statsMutex.unlock(running_user_fiber);
- printbuf << "END\n";
+ printbuf << "END\n";
}
-void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current) {
- // Print only non zero stats
+void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current)
+{
long long stat_val = 0;
bool stat_get_status;
- if(current) {
+ if(current)
stat_get_status = UFStatSystem::get_current(stat_name, &stat_val);
- }
- else {
+ else
stat_get_status = UFStatSystem::get(stat_name, &stat_val);
- }
- if(stat_get_status && stat_val != 0) {
+ //if(stat_get_status && stat_val != 0) {
+ if(stat_get_status)
printbuf << "STAT " << stat_name << " " << stat_val << "\n";
- }
}
-void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current) {
- printbuf << "TIME " << _startTime <<"\n";
+void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current)
+{
+ printbuf << "TIME " << _startTime <<"\n";
for(std::vector<std::string>::const_iterator it = stat_names.begin();
it != stat_names.end();
- it++) {
+ it++)
printStat(it->c_str(), printbuf, current);
- }
printbuf << "END\n";
}
@@ -544,11 +554,11 @@ UFStatCollector::getStatsWithPrefix(cons
statsMutex.lock(running_user_fiber);
// Get all stats which start with stat_prefix
for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
- it != UFStatSystem::global_stats.end(); it++) {
+ it != UFStatSystem::global_stats.end(); it++)
+ {
size_t found = it->name.find(stat_prefix);
- if(found == 0) {
+ if(!found)
stat_names.push_back(it->name);
- }
}
statsMutex.unlock(running_user_fiber);
}
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStats.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStats.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStats.C Fri Aug 13 19:43:09 2010
@@ -1,6 +1,7 @@
-#include "UFStats.H"
-#include "UFStatSystem.H"
+#include <UFStats.H>
+#include <UFStatSystem.H>
+uint32_t UFStats::currentConnections;
uint32_t UFStats::connectionsHandled;
uint32_t UFStats::txnSuccess;
uint32_t UFStats::txnFail;
@@ -12,6 +13,7 @@ namespace UFStats
{
void registerStats(bool lock_needed)
{
+ UFStatSystem::registerStat("connections.current", ¤tConnections, lock_needed);
UFStatSystem::registerStat("connections.handled", &connectionsHandled, lock_needed);
UFStatSystem::registerStat("txn.success", &txnSuccess, lock_needed);
UFStatSystem::registerStat("txn.fail", &txnFail, lock_needed);
Modified: trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C (original)
+++ trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C Fri Aug 13 19:43:09 2010
@@ -21,14 +21,15 @@
#include <string>
#include <stdio.h>
-#include "UF.H"
-#include "UFIO.H"
-#include "UFServer.H"
-#include "UFConnectionPool.H"
+#include <ufcore/UF.H>
+#include <ufcore/UFIO.H>
+#include <ufcore/UFServer.H>
+#include <ufcore/UFConnectionPool.H>
#include <vector>
using namespace std;
+unsigned int NUM_REQUESTS_TO_RUN = 0;
unsigned short int NUM_THREADS = 1;
unsigned int NUM_USER_FIBERS_ALLOWED_TO_RUN = 1;
unsigned int NUM_CONNECTIONS_PER_FIBER = 1;
@@ -79,10 +80,10 @@ ResponseInfoObject overallInfo;
pthread_mutex_t overallInfoTrackMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_key_t threadUpdateOverallInfo;
-int GET_RESPONSE_TIMEOUT = 1*1000*1000;
+TIME_IN_US GET_RESPONSE_TIMEOUT = -1;
string DOUBLE_NEWLINE = "\r\n\r\n";
unsigned int DOUBLE_NEWLINE_LENGTH = DOUBLE_NEWLINE.length();
-bool readData(UFIO* ufio, unsigned int timeout, bool& connClosed)
+bool readData(UFIO* ufio, bool& connClosed)
{
string result;
char buf[4096];
@@ -92,7 +93,7 @@ bool readData(UFIO* ufio, unsigned int t
bool okToExitToEnd = false;
while(1)
{
- int num_bytes_read = ufio->read(buf, 4095, 0);
+ int num_bytes_read = ufio->read(buf, 4095, GET_RESPONSE_TIMEOUT);
if(num_bytes_read <= 0)
{
if(okToExitToEnd && (num_bytes_read == 0))
@@ -101,8 +102,13 @@ bool readData(UFIO* ufio, unsigned int t
connClosed = true;
return true;
}
+ cerr<<"bytes read = "<<num_bytes_read<<" with errno = "<<strerror(ufio->getErrno())<<endl;
return false;
}
+ /*
+ else
+ cerr<<"read"<<string(buf, num_bytes_read)<<endl;
+ */
result.append(buf, num_bytes_read);
@@ -147,6 +153,7 @@ bool readData(UFIO* ufio, unsigned int t
(result.length() > (endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength))) //dont support pipelining yet
{
cerr<<"read more than supposed to"<<endl;
+ cerr<<"read "<<result;
return false;
}
else if(result.length() == endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength)
@@ -158,10 +165,12 @@ bool readData(UFIO* ufio, unsigned int t
unsigned int SLEEP_BETWEEN_CONN_SETUP = 0;
-int CONNECT_AND_REQUEST_TIMEOUT = 1*1000*1000;
+TIME_IN_US CONNECT_AND_REQUEST_TIMEOUT = -1;
bool writeData(UFIO* ufio, const string& data)
{
int amt_written = ufio->write(data.data(), data.length(), CONNECT_AND_REQUEST_TIMEOUT);
+ if(amt_written <= 0)
+ cerr<<"write failed "<<ufio->getErrno()<<endl;
return ((amt_written == (int)data.length()) ? true : false);
}
@@ -175,7 +184,7 @@ UFIO* getConn(ResponseInfoObject* rIO)
struct timeval start,finish;
gettimeofday(&start, 0);
- UFIO* ufio = cpool->getConnection(remote_addr);
+ UFIO* ufio = cpool->getConnection(remote_addr, true, CONNECT_AND_REQUEST_TIMEOUT);
if(!ufio)
{
if(random()%100 < 10)
@@ -221,7 +230,7 @@ void run_handler()
return;
//do the requests
- unsigned short int num_requests_run = 0;
+ unsigned int num_requests_run = 0;
while(num_requests_run++ < NUM_REQUESTS_PER_FIBER)
{
if(INTER_SEND_SLEEP)
@@ -258,7 +267,7 @@ void run_handler()
bool connClosed = false;
- if(!readData(ufio, CONNECT_AND_REQUEST_TIMEOUT, connClosed))
+ if(!readData(ufio, connClosed))
{
if(random()%100 < 10)
cerr<<"bailing since read data failed "<<strerror(errno)<<endl;
@@ -300,7 +309,7 @@ void ClientUF::run()
ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo);
if(!rIO)
return;
- unsigned short int num_requests_run = 0;
+ unsigned int num_requests_run = 0;
while(num_requests_run++ < NUM_CONNECTIONS_PER_FIBER)
{
//wait if told to do so
@@ -405,16 +414,13 @@ void SetupClientUF::run()
{
if(!rIO.num_user_fibers_running)
break;
- else if ((THREAD_COMPLETION_PERCENT_TO_BAIL_ON < 100) &&
- (rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN <= THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
- )
- {
- cerr<<"bailing due to "<<"num_user_fibers_running = "<<rIO.num_user_fibers_running<<" and div = "<<(rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" and amt to bail on = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl;
- break;
- }
UF::gusleep(5000000);
- cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" ("<<(rIO.num_attempt*100/(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN))<<"%)"<<endl;
+ unsigned short int threadCompletionPercent = (rIO.num_attempt*100)/NUM_REQUESTS_TO_RUN;
+ cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<NUM_REQUESTS_TO_RUN<<" ("<<threadCompletionPercent<<"%)"<<endl;
+
+ if(threadCompletionPercent > THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
+ break;
}
}
@@ -551,6 +557,7 @@ void print_usage()
int main(int argc, char** argv)
{
+ unsigned long long int DELAY_BETWEEN_STARTING_THREADS_IN_US = 0;
if(pthread_key_create(&threadUpdateOverallInfo, 0) != 0)
{
cerr<<"couldnt create key for threadUpdateOverallInfo "<<strerror(errno)<<endl;
@@ -560,10 +567,13 @@ int main(int argc, char** argv)
string rem_port = "80";
string rem_addr = "127.0.0.1";
char ch;
- while ((ch = getopt(argc, argv, "M:Z:U:x:X:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1)
+ while ((ch = getopt(argc, argv, "M:Z:U:x:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1)
{
switch (ch)
{
+ case'x':
+ DELAY_BETWEEN_STARTING_THREADS_IN_US = atoi(optarg);
+ break;
case 'Z':
sleepShouldBeRandom = atoi(optarg);
break;
@@ -609,14 +619,10 @@ int main(int argc, char** argv)
HTTP_BASE_REQ_STRING = optarg;
break;
case 'c':
- CONNECT_AND_REQUEST_TIMEOUT = atoi(optarg)*1000;
- if(CONNECT_AND_REQUEST_TIMEOUT <= 0)
- CONNECT_AND_REQUEST_TIMEOUT = -1;
+ CONNECT_AND_REQUEST_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1;
break;
case 'd':
- GET_RESPONSE_TIMEOUT = atoi(optarg)*1000;
- if(GET_RESPONSE_TIMEOUT <= 0)
- GET_RESPONSE_TIMEOUT = -1;
+ GET_RESPONSE_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1;
break;
case 's':
INTER_SEND_SLEEP = atoi(optarg)*1000;
@@ -642,6 +648,7 @@ int main(int argc, char** argv)
remote_addr = rem_addr + ":" + rem_port;
print_info();
+ NUM_REQUESTS_TO_RUN = NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN;
//create the threads
pthread_t* thread = new pthread_t[NUM_THREADS];
@@ -651,6 +658,11 @@ int main(int argc, char** argv)
list<UF*>* ufList = new list<UF*>();
ufList->push_back(new SetupClientUF());
UFIO::ufCreateThreadWithIO(&thread[i], ufList);
+ if(DELAY_BETWEEN_STARTING_THREADS_IN_US)
+ {
+ cerr<<"sleeping for "<<DELAY_BETWEEN_STARTING_THREADS_IN_US<<endl;
+ usleep(DELAY_BETWEEN_STARTING_THREADS_IN_US);
+ }
}