You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/08/14 00:14:32 UTC
svn commit: r985387 [3/3] - in /trafficserver/traffic/branches/UserFiber:
core/include/ core/src/ protocol/ samples/
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFServer.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFServer.C Fri Aug 13 22:14:31 2010
@@ -1,16 +1,13 @@
#include <iostream>
#include <errno.h>
#include <string.h>
+#include "UFServer.H"
+#include "UFServer.H"
+
#include <sys/types.h>
#include <sys/wait.h>
-#include <deque>
-#include <list>
-
-#include <UF.H>
-#include <UFStatSystem.H>
-#include <UFStats.H>
-#include <UFServer.H>
-#include <UFConf.H>
+#include "UFStatSystem.H"
+#include "UFStats.H"
using namespace std;
@@ -18,15 +15,31 @@ using namespace std;
//TODO: create monitoring port later
//
//
+static string getPrintableTime()
+{
+ char asctimeDate[32];
+ asctimeDate[0] = '\0';
+ time_t now = time(0);
+ asctime_r(localtime(&now), asctimeDate);
+
+ string response = asctimeDate;
+ size_t loc = response.find('\n');
+ if(loc != string::npos)
+ response.replace(loc, 1, "");
+ return response;
+}
+
void UFServer::reset()
{
_addressToBindTo = "0";
- _listenSockets.clear();
+ _listenFd = -1;
+ _port = 0;
_creationTime = 0;
MAX_THREADS_ALLOWED = 8;
MAX_PROCESSES_ALLOWED = 1;
MAX_ACCEPT_THREADS_ALLOWED = 1;
+ UF_STACK_SIZE = 8192;
_threadChooser = 0;
}
@@ -44,13 +57,9 @@ struct NewConnUF : public UF
return;
UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
-
+ ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
// increment connections handled stat
UFStatSystem::increment(UFStats::connectionsHandled);
- // Keep track of current connections
- UFStatSystem::increment(UFStats::currentConnections);
- ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
- UFStatSystem::increment(UFStats::currentConnections, -1);
//clear the client connection
delete fiberStartingArgs->ufio;
@@ -80,22 +89,21 @@ struct AcceptRunner : public UF
//add the scheduler for this
UFIO* ufio = new UFIO(UFScheduler::getUF());
- if (socket.fd == -1)
- {
- socket.fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), socket.port /*, deal w/ backlog*/);
- }
- if (socket.fd < 0)
+ int fd = ufserver->getListenFd();
+ if(fd == -1)
+ fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), ufserver->getPort() /*, deal w/ backlog*/);
+ if(fd < 0)
{
cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup listen socket"<<endl;
exit(1);
}
- if(!ufio || !ufio->setFd(socket.fd, false/*has already been made non-blocking*/))
+ if(!ufio || !ufio->setFd(fd, false/*has already been made non-blocking*/))
{
cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup accept thread"<<endl;
return;
}
- ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, socket.port, ufserver, 0, 0);
+ ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, ufserver, 0, 0);
}
AcceptRunner(bool registerMe = false)
{
@@ -105,39 +113,10 @@ struct AcceptRunner : public UF
UF* createUF() { return new AcceptRunner(); }
static AcceptRunner* _self;
static int _myLoc;
- UFServer::ListenSocket socket;
};
int AcceptRunner::_myLoc = -1;
AcceptRunner* AcceptRunner::_self = new AcceptRunner(true);
-struct PerThreadInitializer : public UF
-{
- void run()
- {
- if(!_startingArgs)
- return;
-// Add conf manager for thread
- UFConfManager *confManager = new UFConfManager;
- int ret = pthread_setspecific(UFConfManager::threadSpecificKey, confManager);
- cerr << getpid() << ":::Adding thread specific UFConfManager key " << UFConfManager::threadSpecificKey << " " << confManager << " " << ret << ", tid : " << pthread_self() << endl;
-
- UFServer *_server = (UFServer *)_startingArgs;
- _server->postThreadCreation();
- }
-
- UF* createUF() { return new PerThreadInitializer(); }
-
- PerThreadInitializer(bool registerMe = false)
- {
- if(registerMe)
- _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
- }
- static PerThreadInitializer* _self;
- static int _myLoc;
-};
-int PerThreadInitializer::_myLoc = -1;
-PerThreadInitializer* PerThreadInitializer::_self = new PerThreadInitializer(true);
-
void UFServer::startThreads()
{
preThreadCreation();
@@ -150,14 +129,7 @@ void UFServer::startThreads()
//start the IO threads
for(; i<MAX_THREADS_ALLOWED; i++)
{
- list<UF*>* ufsToAdd = new list<UF*>();
-
- PerThreadInitializer *pti = new PerThreadInitializer;
- pti->_startingArgs = this;
- ufsToAdd->push_back(pti);
-
- UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
-
+ UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>());
cerr<<getPrintableTime()<<" "<<getpid()<<": created thread (with I/O) - "<<thread[i]<<endl;
usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to cond. var later
//add the io threads to the thread chooser
@@ -180,19 +152,10 @@ void UFServer::startThreads()
//start the accept thread
for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
{
+ AcceptRunner* ar = new AcceptRunner();
+ ar->_startingArgs = this;
list<UF*>* ufsToAdd = new list<UF*>();
- for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end(); ++iter)
- {
- AcceptRunner* ar = new AcceptRunner();
- ar->_startingArgs = this;
- ar->socket = *iter;
- ufsToAdd->push_back(ar);
- }
-
- PerThreadInitializer *pti = new PerThreadInitializer();
- pti->_startingArgs = this;
- ufsToAdd->push_back(pti);
-
+ ufsToAdd->push_back(ar);
UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
usleep(5000); //TODO: let the thread finish initializing
addThread("ACCEPT", 0, thread[i]);
@@ -215,17 +178,14 @@ void UFServer::run()
if(!_threadChooser)
_threadChooser = new UFServerThreadChooser();
- for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end(); ++iter)
+ //bind to the socket (before the fork
+ _listenFd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), _port); //TODO:set the backlog
+ if(_listenFd < 0)
{
- //bind to the socket (before the fork)
- iter->fd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), iter->port); //TODO:set the backlog
- if(iter->fd < 0)
- {
- cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl;
- exit(1);
- }
+ cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl;
+ exit(1);
}
-
+
if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes (or to only run in threaded mode)
{
preThreadRun();
@@ -254,7 +214,6 @@ void UFServer::run()
postForkPreRun();
preThreadRun();
startThreads();
- postThreadRun();
exit(0);
}
cerr<<getPrintableTime()<<" "<<getpid()<<": (P): started child process: "<<pid<<endl;
Propchange: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
('svn:mergeinfo' removed)
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C Fri Aug 13 22:14:31 2010
@@ -1,16 +1,15 @@
#include <string.h>
#include <stdio.h>
-#include <UFStatSystem.H>
-#include <UF.H>
-#include <UFIO.H>
-#include <UFServer.H>
+#include "UFStatSystem.H"
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
#include <iostream>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
-using namespace std;
UFServer *UFStatSystem::server;
std::map<std::string, uint32_t> UFStatSystem::stat_name_to_num;
@@ -127,8 +126,9 @@ bool UFStatSystem::get_current(uint32_t
bool UFStatSystem::get_current(const char *stat_name, long long *stat_val)
{
uint32_t stat_num;
- if(!getStatNum(stat_name, stat_num))
+ if(!getStatNum(stat_name, stat_num)) {
return false;
+ }
return get_current(stat_num, stat_val);
}
@@ -349,9 +349,8 @@ void StatCommandProcessing::run()
char readbuf[1024];
std::string readData;
- while(1)
- {
- if((readbytes = ufio->read(readbuf, 1023, 60000000)) <= 0)
+ while(1) {
+ if((readbytes = ufio->read(readbuf, 1023, 0)) <= 0)
break;
readData.append(readbuf, readbytes);
@@ -359,26 +358,25 @@ void StatCommandProcessing::run()
if(readData.find("\r\n") == string::npos)
continue;
- if(readData.find("stats_current") != string::npos)
- {
+ if(readData.find("stats_current") != string::npos) {
// Force a collect before printing out the stats
UFStatSystem::collect();
std::stringstream printbuf;
UFStatCollector::printStats(printbuf);
- if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
//failed write, break to close connection
break;
+ }
}
- else if (readData.find("stats") != string::npos)
- {
+ else if (readData.find("stats") != string::npos) {
std::stringstream printbuf;
UFStatCollector::printStats(printbuf);
- if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
//failed write, break to close connection
break;
+ }
}
- else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos)
- {
+ else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos) {
std::vector<std::string> stats;
char stat_name[MAX_STAT_NAME_LENGTH];
bzero(stat_name, MAX_STAT_NAME_LENGTH);
@@ -389,49 +387,49 @@ void StatCommandProcessing::run()
bool get_current = false;
if(readData.find("stat ") != string::npos)
start += strlen("stat ");
- else
- {
+ else {
start += strlen("stat_current ");
get_current = true;
}
- while(sscanf(start, "%s%n", stat_name, &next) == 1)
- {
+ while(sscanf(start, "%s%n", stat_name, &next) == 1) {
// Prefix support
char *prefix_end = strchr(start, '*');
- if(prefix_end != NULL)
- {
+ if(prefix_end != NULL) {
std::string prefix;
prefix.assign(start, prefix_end-start);
// Get all stats with the prefix
UFStatCollector::getStatsWithPrefix(prefix, stats);
}
- else
+ else {
stats.push_back(stat_name);
+ }
bzero(stat_name, MAX_STAT_NAME_LENGTH);
start+=next;
}
std::stringstream printbuf;
UFStatCollector::printStats(stats, printbuf, get_current);
- if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
//failed write, break to close connection
break;
+ }
}
- else if (readData.find("help") != string::npos)
- {
- if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)
+ else if (readData.find("help") != string::npos) {
+ if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1) {
//failed write, break to close connection
break;
+ }
}
- else if (readData.find("quit") != string::npos)
+ else if (readData.find("quit") != string::npos) {
break;
- else
- {
+ }
+ else {
if ((ufio->write(cmdUnrec, sizeof(cmdUnrec)-1) == -1) ||
- (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1))
+ (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)) {
//failed write, break to close connection
break;
+ }
}
readData.clear();
} // END while loop
@@ -446,24 +444,12 @@ int StatCommandListenerRun::_myLoc = -1;
StatCommandListenerRun* StatCommandListenerRun::_self = new StatCommandListenerRun(true);
void StatCommandListenerRun::run()
{
- int fd;
- unsigned int counter = 0;
- do
+ int fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
+ if(fd < 0)
{
- UFStatCollector::_statCommandPort += counter;
- fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
- if(fd < 0)
- {
- cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
- if(counter++ == 50) //try upto 50 times
- return;
- continue;
- }
- else
- break;
- } while(1);
- cerr<<"setup stat command port at "<<UFStatCollector::_statCommandPort;
-
+ cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
+ return;
+ }
UFIO* ufio = new UFIO(UFScheduler::getUF());
if(!ufio)
@@ -475,7 +461,7 @@ void StatCommandListenerRun::run()
ufio->setFd(fd, false);
StatThreadChooser ufiotChooser;
- ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, UFStatCollector::_statCommandPort, 0, 0);
+ ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, 0, 0, 0);
}
void* UFStatCollector::scheduler(void *args)
@@ -483,66 +469,70 @@ void* UFStatCollector::scheduler(void *a
if(!args)
return 0;
- cerr<<getPrintableTime()<<" "<<getpid()<<": created stats thread (with I/O) - "<<pthread_self()<<endl;
// add jobs to scheduler
UFScheduler ufs;
- //insertion has to be done in a LIFO (stack) manner
// stat collector
ufs.addFiberToScheduler(new CollectorRunner());
- // stat command listener port
- ufs.addFiberToScheduler(new StatCommandListenerRun());
- ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
- // io scheduler
- ufs.addFiberToScheduler(new IORunner());
-
+
// set thread for stat command listener to run on
StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
+ // io scheduler
+ ufs.addFiberToScheduler(new IORunner());
+
+ // stat command listener
+ ufs.addFiberToScheduler(new StatCommandListenerRun());
+ ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
ufs.runScheduler();
return 0;
}
//----------------------------------------------------------
-void UFStatCollector::printStats(std::stringstream &printbuf)
-{
- printbuf << "Cache stats: \n"
+void UFStatCollector::printStats(std::stringstream &printbuf) {
+ printbuf << "Cache stats: \n"
"-----------------------------------------------------------------------------\n";
- printbuf << "TIME " << _startTime <<"\n";
+
+ printbuf << "TIME " << _startTime <<"\n";
- UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
- UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
- statsMutex.lock(running_user_fiber);
+ UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+ statsMutex.lock(running_user_fiber);
- for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
- it != UFStatSystem::global_stats.end(); it++)
- printbuf << "STAT " << it->name << " " << it->value << "\n";
- statsMutex.unlock(running_user_fiber);
+ for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
+ it != UFStatSystem::global_stats.end(); it++) {
+ if(it->value != 0 ) {
+ printbuf << "STAT " << it->name << " " << it->value << "\n";
+ }
+ }
+ statsMutex.unlock(running_user_fiber);
- printbuf << "END\n";
+ printbuf << "END\n";
}
-void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current)
-{
+void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current) {
+ // Print only non zero stats
long long stat_val = 0;
bool stat_get_status;
- if(current)
+ if(current) {
stat_get_status = UFStatSystem::get_current(stat_name, &stat_val);
- else
+ }
+ else {
stat_get_status = UFStatSystem::get(stat_name, &stat_val);
+ }
- //if(stat_get_status && stat_val != 0) {
- if(stat_get_status)
+ if(stat_get_status && stat_val != 0) {
printbuf << "STAT " << stat_name << " " << stat_val << "\n";
+ }
}
-void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current)
-{
- printbuf << "TIME " << _startTime <<"\n";
+void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current) {
+ printbuf << "TIME " << _startTime <<"\n";
for(std::vector<std::string>::const_iterator it = stat_names.begin();
it != stat_names.end();
- it++)
+ it++) {
printStat(it->c_str(), printbuf, current);
+ }
printbuf << "END\n";
}
@@ -554,11 +544,11 @@ UFStatCollector::getStatsWithPrefix(cons
statsMutex.lock(running_user_fiber);
// Get all stats which start with stat_prefix
for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
- it != UFStatSystem::global_stats.end(); it++)
- {
+ it != UFStatSystem::global_stats.end(); it++) {
size_t found = it->name.find(stat_prefix);
- if(!found)
+ if(found == 0) {
stat_names.push_back(it->name);
+ }
}
statsMutex.unlock(running_user_fiber);
}
Propchange: trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
('svn:mergeinfo' removed)
Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStats.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStats.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStats.C Fri Aug 13 22:14:31 2010
@@ -1,7 +1,6 @@
-#include <UFStats.H>
-#include <UFStatSystem.H>
+#include "UFStats.H"
+#include "UFStatSystem.H"
-uint32_t UFStats::currentConnections;
uint32_t UFStats::connectionsHandled;
uint32_t UFStats::txnSuccess;
uint32_t UFStats::txnFail;
@@ -13,7 +12,6 @@ namespace UFStats
{
void registerStats(bool lock_needed)
{
- UFStatSystem::registerStat("connections.current", ¤tConnections, lock_needed);
UFStatSystem::registerStat("connections.handled", &connectionsHandled, lock_needed);
UFStatSystem::registerStat("txn.success", &txnSuccess, lock_needed);
UFStatSystem::registerStat("txn.fail", &txnFail, lock_needed);
Propchange: trafficserver/traffic/branches/UserFiber/protocol/
('svn:mergeinfo' removed)
Modified: trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C?rev=985387&r1=985386&r2=985387&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C (original)
+++ trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C Fri Aug 13 22:14:31 2010
@@ -21,15 +21,14 @@
#include <string>
#include <stdio.h>
-#include <ufcore/UF.H>
-#include <ufcore/UFIO.H>
-#include <ufcore/UFServer.H>
-#include <ufcore/UFConnectionPool.H>
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
+#include "UFConnectionPool.H"
#include <vector>
using namespace std;
-unsigned int NUM_REQUESTS_TO_RUN = 0;
unsigned short int NUM_THREADS = 1;
unsigned int NUM_USER_FIBERS_ALLOWED_TO_RUN = 1;
unsigned int NUM_CONNECTIONS_PER_FIBER = 1;
@@ -80,10 +79,10 @@ ResponseInfoObject overallInfo;
pthread_mutex_t overallInfoTrackMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_key_t threadUpdateOverallInfo;
-TIME_IN_US GET_RESPONSE_TIMEOUT = -1;
+int GET_RESPONSE_TIMEOUT = 1*1000*1000;
string DOUBLE_NEWLINE = "\r\n\r\n";
unsigned int DOUBLE_NEWLINE_LENGTH = DOUBLE_NEWLINE.length();
-bool readData(UFIO* ufio, bool& connClosed)
+bool readData(UFIO* ufio, unsigned int timeout, bool& connClosed)
{
string result;
char buf[4096];
@@ -93,7 +92,7 @@ bool readData(UFIO* ufio, bool& connClos
bool okToExitToEnd = false;
while(1)
{
- int num_bytes_read = ufio->read(buf, 4095, GET_RESPONSE_TIMEOUT);
+ int num_bytes_read = ufio->read(buf, 4095, 0);
if(num_bytes_read <= 0)
{
if(okToExitToEnd && (num_bytes_read == 0))
@@ -102,13 +101,8 @@ bool readData(UFIO* ufio, bool& connClos
connClosed = true;
return true;
}
- cerr<<"bytes read = "<<num_bytes_read<<" with errno = "<<strerror(ufio->getErrno())<<endl;
return false;
}
- /*
- else
- cerr<<"read"<<string(buf, num_bytes_read)<<endl;
- */
result.append(buf, num_bytes_read);
@@ -153,7 +147,6 @@ bool readData(UFIO* ufio, bool& connClos
(result.length() > (endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength))) //dont support pipelining yet
{
cerr<<"read more than supposed to"<<endl;
- cerr<<"read "<<result;
return false;
}
else if(result.length() == endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength)
@@ -165,12 +158,10 @@ bool readData(UFIO* ufio, bool& connClos
unsigned int SLEEP_BETWEEN_CONN_SETUP = 0;
-TIME_IN_US CONNECT_AND_REQUEST_TIMEOUT = -1;
+int CONNECT_AND_REQUEST_TIMEOUT = 1*1000*1000;
bool writeData(UFIO* ufio, const string& data)
{
int amt_written = ufio->write(data.data(), data.length(), CONNECT_AND_REQUEST_TIMEOUT);
- if(amt_written <= 0)
- cerr<<"write failed "<<ufio->getErrno()<<endl;
return ((amt_written == (int)data.length()) ? true : false);
}
@@ -184,7 +175,7 @@ UFIO* getConn(ResponseInfoObject* rIO)
struct timeval start,finish;
gettimeofday(&start, 0);
- UFIO* ufio = cpool->getConnection(remote_addr, true, CONNECT_AND_REQUEST_TIMEOUT);
+ UFIO* ufio = cpool->getConnection(remote_addr);
if(!ufio)
{
if(random()%100 < 10)
@@ -230,7 +221,7 @@ void run_handler()
return;
//do the requests
- unsigned int num_requests_run = 0;
+ unsigned short int num_requests_run = 0;
while(num_requests_run++ < NUM_REQUESTS_PER_FIBER)
{
if(INTER_SEND_SLEEP)
@@ -267,7 +258,7 @@ void run_handler()
bool connClosed = false;
- if(!readData(ufio, connClosed))
+ if(!readData(ufio, CONNECT_AND_REQUEST_TIMEOUT, connClosed))
{
if(random()%100 < 10)
cerr<<"bailing since read data failed "<<strerror(errno)<<endl;
@@ -309,7 +300,7 @@ void ClientUF::run()
ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo);
if(!rIO)
return;
- unsigned int num_requests_run = 0;
+ unsigned short int num_requests_run = 0;
while(num_requests_run++ < NUM_CONNECTIONS_PER_FIBER)
{
//wait if told to do so
@@ -414,13 +405,16 @@ void SetupClientUF::run()
{
if(!rIO.num_user_fibers_running)
break;
+ else if ((THREAD_COMPLETION_PERCENT_TO_BAIL_ON < 100) &&
+ (rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN <= THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
+ )
+ {
+ cerr<<"bailing due to "<<"num_user_fibers_running = "<<rIO.num_user_fibers_running<<" and div = "<<(rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" and amt to bail on = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl;
+ break;
+ }
UF::gusleep(5000000);
- unsigned short int threadCompletionPercent = (rIO.num_attempt*100)/NUM_REQUESTS_TO_RUN;
- cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<NUM_REQUESTS_TO_RUN<<" ("<<threadCompletionPercent<<"%)"<<endl;
-
- if(threadCompletionPercent > THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
- break;
+ cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" ("<<(rIO.num_attempt*100/(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN))<<"%)"<<endl;
}
}
@@ -557,7 +551,6 @@ void print_usage()
int main(int argc, char** argv)
{
- unsigned long long int DELAY_BETWEEN_STARTING_THREADS_IN_US = 0;
if(pthread_key_create(&threadUpdateOverallInfo, 0) != 0)
{
cerr<<"couldnt create key for threadUpdateOverallInfo "<<strerror(errno)<<endl;
@@ -567,13 +560,10 @@ int main(int argc, char** argv)
string rem_port = "80";
string rem_addr = "127.0.0.1";
char ch;
- while ((ch = getopt(argc, argv, "M:Z:U:x:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1)
+ while ((ch = getopt(argc, argv, "M:Z:U:x:X:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1)
{
switch (ch)
{
- case'x':
- DELAY_BETWEEN_STARTING_THREADS_IN_US = atoi(optarg);
- break;
case 'Z':
sleepShouldBeRandom = atoi(optarg);
break;
@@ -619,10 +609,14 @@ int main(int argc, char** argv)
HTTP_BASE_REQ_STRING = optarg;
break;
case 'c':
- CONNECT_AND_REQUEST_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1;
+ CONNECT_AND_REQUEST_TIMEOUT = atoi(optarg)*1000;
+ if(CONNECT_AND_REQUEST_TIMEOUT <= 0)
+ CONNECT_AND_REQUEST_TIMEOUT = -1;
break;
case 'd':
- GET_RESPONSE_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1;
+ GET_RESPONSE_TIMEOUT = atoi(optarg)*1000;
+ if(GET_RESPONSE_TIMEOUT <= 0)
+ GET_RESPONSE_TIMEOUT = -1;
break;
case 's':
INTER_SEND_SLEEP = atoi(optarg)*1000;
@@ -648,7 +642,6 @@ int main(int argc, char** argv)
remote_addr = rem_addr + ":" + rem_port;
print_info();
- NUM_REQUESTS_TO_RUN = NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN;
//create the threads
pthread_t* thread = new pthread_t[NUM_THREADS];
@@ -658,11 +651,6 @@ int main(int argc, char** argv)
list<UF*>* ufList = new list<UF*>();
ufList->push_back(new SetupClientUF());
UFIO::ufCreateThreadWithIO(&thread[i], ufList);
- if(DELAY_BETWEEN_STARTING_THREADS_IN_US)
- {
- cerr<<"sleeping for "<<DELAY_BETWEEN_STARTING_THREADS_IN_US<<endl;
- usleep(DELAY_BETWEEN_STARTING_THREADS_IN_US);
- }
}