You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/08/13 21:43:10 UTC

svn commit: r985334 [3/3] - in /trafficserver/traffic/branches/UserFiber: core/include/ core/src/ samples/

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFServer.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFServer.C Fri Aug 13 19:43:09 2010
@@ -1,13 +1,16 @@
 #include <iostream>
 #include <errno.h>
 #include <string.h>
-#include "UFServer.H"
-#include "UFServer.H"
-
 #include <sys/types.h>
 #include <sys/wait.h>
-#include "UFStatSystem.H"
-#include "UFStats.H"
+#include <deque>
+#include <list>
+
+#include <UF.H>
+#include <UFStatSystem.H>
+#include <UFStats.H>
+#include <UFServer.H>
+#include <UFConf.H>
 
 using namespace std;
 
@@ -15,31 +18,15 @@ using namespace std;
 //TODO: create monitoring port later
 //
 //
-static string getPrintableTime()
-{
-    char asctimeDate[32];
-    asctimeDate[0] = '\0';
-    time_t now = time(0);
-    asctime_r(localtime(&now), asctimeDate);
-
-    string response = asctimeDate;
-    size_t loc = response.find('\n');
-    if(loc != string::npos)
-        response.replace(loc, 1, "");
-    return response;
-}
-
 void UFServer::reset()
 {
     _addressToBindTo = "0";
-    _listenFd = -1;
-    _port = 0;
+    _listenSockets.clear();
     _creationTime = 0;
 
     MAX_THREADS_ALLOWED = 8;
     MAX_PROCESSES_ALLOWED = 1;
     MAX_ACCEPT_THREADS_ALLOWED = 1;
-    UF_STACK_SIZE = 8192;
 
     _threadChooser = 0;
 }
@@ -57,9 +44,13 @@ struct NewConnUF : public UF
             return;
 
         UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
-        ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
+
         // increment connections handled stat
         UFStatSystem::increment(UFStats::connectionsHandled);
+        // Keep track of current connections
+        UFStatSystem::increment(UFStats::currentConnections);
+        ((UFServer*) fiberStartingArgs->args)->handleNewConnection(fiberStartingArgs->ufio);
+        UFStatSystem::increment(UFStats::currentConnections, -1);
 
         //clear the client connection
         delete fiberStartingArgs->ufio;
@@ -89,21 +80,22 @@ struct AcceptRunner : public UF
 
         //add the scheduler for this 
         UFIO* ufio = new UFIO(UFScheduler::getUF());
-        int fd = ufserver->getListenFd();
-        if(fd == -1)
-            fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), ufserver->getPort() /*, deal w/ backlog*/);
-        if(fd < 0)
+        if (socket.fd == -1)
+        {
+            socket.fd = UFIO::setupConnectionToAccept(ufserver->getBindingInterface(), socket.port /*, deal w/ backlog*/);
+        }
+        if (socket.fd < 0)
         {
             cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup listen socket"<<endl;
             exit(1);
         }
-        if(!ufio || !ufio->setFd(fd, false/*has already been made non-blocking*/))
+        if(!ufio || !ufio->setFd(socket.fd, false/*has already been made non-blocking*/))
         {
             cerr<<getPrintableTime()<<" "<<getpid()<<":couldnt setup accept thread"<<endl;
             return;
         }
 
-        ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, ufserver, 0, 0);
+        ufio->accept(ufserver->_threadChooser, NewConnUF::_myLoc, socket.port, ufserver, 0, 0);
     }
     AcceptRunner(bool registerMe = false)
     {
@@ -113,10 +105,39 @@ struct AcceptRunner : public UF
     UF* createUF() { return new AcceptRunner(); }
     static AcceptRunner* _self;
     static int _myLoc;
+    UFServer::ListenSocket socket;
 };
 int AcceptRunner::_myLoc = -1;
 AcceptRunner* AcceptRunner::_self = new AcceptRunner(true);
 
+struct PerThreadInitializer : public UF
+{
+    void run()
+        {
+            if(!_startingArgs)
+                return;
+// Add conf manager for thread
+            UFConfManager *confManager = new UFConfManager;
+            int ret = pthread_setspecific(UFConfManager::threadSpecificKey, confManager);
+            cerr << getpid() << ":::Adding thread specific UFConfManager key " << UFConfManager::threadSpecificKey << " " << confManager << " " << ret << ", tid : " << pthread_self() << endl;
+
+            UFServer *_server = (UFServer *)_startingArgs;
+            _server->postThreadCreation();
+        }
+
+    UF* createUF() { return new PerThreadInitializer(); }
+
+    PerThreadInitializer(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    static PerThreadInitializer* _self;
+    static int _myLoc;
+};
+int PerThreadInitializer::_myLoc = -1;
+PerThreadInitializer* PerThreadInitializer::_self = new PerThreadInitializer(true);
+
 void UFServer::startThreads()
 {
     preThreadCreation();
@@ -129,7 +150,14 @@ void UFServer::startThreads()
     //start the IO threads
     for(; i<MAX_THREADS_ALLOWED; i++)
     {
-        UFIO::ufCreateThreadWithIO(&(thread[i]), new list<UF*>());
+        list<UF*>* ufsToAdd = new list<UF*>();
+        
+        PerThreadInitializer *pti = new PerThreadInitializer;
+        pti->_startingArgs = this;
+        ufsToAdd->push_back(pti);
+        
+        UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
+
         cerr<<getPrintableTime()<<" "<<getpid()<<": created thread (with I/O) - "<<thread[i]<<endl;
         usleep(5000); //TODO: avoid the need for threadChooser to have a mutex - change to cond. var later
         //add the io threads to the thread chooser
@@ -152,10 +180,19 @@ void UFServer::startThreads()
     //start the accept thread
     for(; i<MAX_ACCEPT_THREADS_ALLOWED+MAX_THREADS_ALLOWED; i++)
     {
-        AcceptRunner* ar = new AcceptRunner();
-        ar->_startingArgs = this;
         list<UF*>* ufsToAdd = new list<UF*>();
-        ufsToAdd->push_back(ar);
+        for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end(); ++iter)
+        {
+            AcceptRunner* ar = new AcceptRunner();
+            ar->_startingArgs = this;
+            ar->socket = *iter;
+            ufsToAdd->push_back(ar);
+        }
+            
+        PerThreadInitializer *pti = new PerThreadInitializer();
+        pti->_startingArgs = this;
+        ufsToAdd->push_back(pti);
+        
         UFIO::ufCreateThreadWithIO(&(thread[i]), ufsToAdd);
         usleep(5000); //TODO: let the thread finish initializing 
         addThread("ACCEPT", 0, thread[i]);
@@ -178,14 +215,17 @@ void UFServer::run()
     if(!_threadChooser)
         _threadChooser = new UFServerThreadChooser();
 
-    //bind to the socket (before the fork
-    _listenFd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), _port); //TODO:set the backlog
-    if(_listenFd < 0)
+    for (ListenSocketList::iterator iter = _listenSockets.begin(); iter != _listenSockets.end(); ++iter)
     {
-        cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl;
-        exit(1);
+        //bind to the socket (before the fork)
+        iter->fd = UFIO::setupConnectionToAccept(_addressToBindTo.c_str(), iter->port); //TODO:set the backlog
+        if(iter->fd < 0)
+        {
+            cerr<<getPrintableTime()<<" "<<getpid()<<": couldnt setup listen socket "<<strerror(errno)<<endl;
+            exit(1);
+        }
     }
-
+    
     if(!MAX_PROCESSES_ALLOWED) //an option to easily debug processes (or to only run in threaded mode)
     {
         preThreadRun();
@@ -214,6 +254,7 @@ void UFServer::run()
                 postForkPreRun();
                 preThreadRun();
                 startThreads();
+                postThreadRun();
                 exit(0);
             }
             cerr<<getPrintableTime()<<" "<<getpid()<<": (P): started child process: "<<pid<<endl;

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStatSystem.C Fri Aug 13 19:43:09 2010
@@ -1,15 +1,16 @@
 #include <string.h>
 #include <stdio.h>
-#include "UFStatSystem.H"
-#include "UF.H"
-#include "UFIO.H"
-#include "UFServer.H"
+#include <UFStatSystem.H>
+#include <UF.H>
+#include <UFIO.H>
+#include <UFServer.H>
 #include <iostream>
 #include <errno.h>
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <arpa/inet.h>
 
+using namespace std;
 UFServer *UFStatSystem::server;
 
 std::map<std::string, uint32_t> UFStatSystem::stat_name_to_num;
@@ -126,9 +127,8 @@ bool UFStatSystem::get_current(uint32_t 
 bool UFStatSystem::get_current(const char *stat_name, long long *stat_val)
 {
     uint32_t stat_num;
-    if(!getStatNum(stat_name, stat_num)) {
+    if(!getStatNum(stat_name, stat_num))
         return false;
-    }
     return get_current(stat_num, stat_val);
 }
 
@@ -349,8 +349,9 @@ void StatCommandProcessing::run()
     char readbuf[1024];
     std::string readData;
     
-    while(1) {
-        if((readbytes = ufio->read(readbuf, 1023, 0)) <= 0)
+    while(1) 
+    {
+        if((readbytes = ufio->read(readbuf, 1023, 60000000)) <= 0)
             break;
 
         readData.append(readbuf, readbytes);
@@ -358,25 +359,26 @@ void StatCommandProcessing::run()
         if(readData.find("\r\n") == string::npos)
            continue;
            
-        if(readData.find("stats_current") != string::npos) {
+        if(readData.find("stats_current") != string::npos) 
+        {
             // Force a collect before printing out the stats
             UFStatSystem::collect();
             std::stringstream printbuf;
             UFStatCollector::printStats(printbuf);
-            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
                 //failed write, break to close connection
                 break;
-            }
         }
-        else if (readData.find("stats") != string::npos) {
+        else if (readData.find("stats") != string::npos) 
+        {
             std::stringstream printbuf;
             UFStatCollector::printStats(printbuf);
-            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
                     //failed write, break to close connection
                 break;
-            }
         }
-        else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos) {
+        else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos) 
+        {
             std::vector<std::string> stats;
             char stat_name[MAX_STAT_NAME_LENGTH];
             bzero(stat_name, MAX_STAT_NAME_LENGTH);
@@ -387,49 +389,49 @@ void StatCommandProcessing::run()
             bool get_current = false;
             if(readData.find("stat ") != string::npos)
                 start += strlen("stat ");
-            else {
+            else 
+            {
                 start += strlen("stat_current ");
                 get_current = true;
             }
             
-            while(sscanf(start, "%s%n", stat_name, &next) == 1) {
+            while(sscanf(start, "%s%n", stat_name, &next) == 1) 
+            {
                 // Prefix support
                 char *prefix_end = strchr(start, '*');
-                if(prefix_end != NULL) {
+                if(prefix_end != NULL) 
+                {
                     std::string prefix;
                     prefix.assign(start, prefix_end-start);
                     // Get all stats with the prefix
                     UFStatCollector::getStatsWithPrefix(prefix, stats);
                 }
-                else {
+                else
                     stats.push_back(stat_name);
-                }
                 bzero(stat_name, MAX_STAT_NAME_LENGTH);
                 start+=next;
             }
             std::stringstream printbuf;
             
             UFStatCollector::printStats(stats, printbuf, get_current);
-            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1)
                 //failed write, break to close connection
                 break;
-            }
         }
-        else if (readData.find("help") != string::npos) {
-            if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1) {
+        else if (readData.find("help") != string::npos) 
+        {
+            if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)
                 //failed write, break to close connection
                 break;
-            }
         }
-        else if (readData.find("quit") != string::npos) {
+        else if (readData.find("quit") != string::npos) 
             break;
-        }
-        else {
+        else 
+        {
             if ((ufio->write(cmdUnrec, sizeof(cmdUnrec)-1) == -1) ||
-                (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)) {
+                (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1))
                 //failed write, break to close connection
                 break;
-            }
         }
         readData.clear();
     } // END while loop
@@ -444,12 +446,24 @@ int StatCommandListenerRun::_myLoc = -1;
 StatCommandListenerRun* StatCommandListenerRun::_self = new StatCommandListenerRun(true);
 void StatCommandListenerRun::run()
 {
-    int fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
-    if(fd < 0)
+    int fd;
+    unsigned int counter = 0;
+    do
     {
-        cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
-        return;
-    }
+        UFStatCollector::_statCommandPort += counter;
+        fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
+        if(fd < 0)
+        {
+            cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
+            if(counter++ == 50) //try upto 50 times
+                return;
+            continue;
+        }
+        else
+            break;
+    } while(1);
+    cerr<<"setup stat command port at "<<UFStatCollector::_statCommandPort;
+
 
     UFIO* ufio = new UFIO(UFScheduler::getUF());
     if(!ufio)
@@ -461,7 +475,7 @@ void StatCommandListenerRun::run()
     ufio->setFd(fd, false);
 
     StatThreadChooser ufiotChooser;
-    ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, 0, 0, 0);
+    ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, UFStatCollector::_statCommandPort, 0, 0);
 }
 
 void* UFStatCollector::scheduler(void *args)
@@ -469,70 +483,66 @@ void* UFStatCollector::scheduler(void *a
     if(!args)
         return 0;
 
+    cerr<<getPrintableTime()<<" "<<getpid()<<": created stats thread (with I/O) - "<<pthread_self()<<endl;
     // add jobs to scheduler
     UFScheduler ufs;
 
+    //insertion has to be done in a LIFO (stack) manner
     // stat collector
     ufs.addFiberToScheduler(new CollectorRunner());
-
-    // set thread for stat command listener to run on
-    StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
-    
+    // stat command listener port
+    ufs.addFiberToScheduler(new StatCommandListenerRun());
+    ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
     // io scheduler
     ufs.addFiberToScheduler(new IORunner());
     
-    // stat command listener
-    ufs.addFiberToScheduler(new StatCommandListenerRun());
-    ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
+    // set thread for stat command listener to run on
+    StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
+    
     ufs.runScheduler();
     return 0;
 }
 
 //----------------------------------------------------------
-void UFStatCollector::printStats(std::stringstream &printbuf) {
-   printbuf <<  "Cache stats: \n"
+void UFStatCollector::printStats(std::stringstream &printbuf) 
+{
+    printbuf <<  "Cache stats: \n"
                 "-----------------------------------------------------------------------------\n";
-   
-  printbuf << "TIME " << _startTime <<"\n";
+    printbuf << "TIME " << _startTime <<"\n";
 
-  UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
-  UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
-  statsMutex.lock(running_user_fiber);
+    UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+    statsMutex.lock(running_user_fiber);
   
-  for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
-      it != UFStatSystem::global_stats.end(); it++) {
-      if(it->value != 0 ) {
-          printbuf << "STAT " << it->name << " " << it->value << "\n";
-      }
-  }
-  statsMutex.unlock(running_user_fiber);
+    for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
+        it != UFStatSystem::global_stats.end(); it++) 
+        printbuf << "STAT " << it->name << " " << it->value << "\n";
+    statsMutex.unlock(running_user_fiber);
 
-  printbuf << "END\n";
+    printbuf << "END\n";
 }
 
-void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current) {
-    // Print only non zero stats
+void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current) 
+{
     long long stat_val = 0;
     bool stat_get_status;
-    if(current) {
+    if(current)
         stat_get_status = UFStatSystem::get_current(stat_name, &stat_val);
-    }
-    else {
+    else
         stat_get_status = UFStatSystem::get(stat_name, &stat_val);
-    }
     
-    if(stat_get_status && stat_val != 0) {
+    //if(stat_get_status && stat_val != 0) {
+    if(stat_get_status)
         printbuf << "STAT " << stat_name << " " << stat_val << "\n";
-    }
 }
 
-void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current) {
-  printbuf << "TIME " << _startTime <<"\n";
+void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current)
+{
+    printbuf << "TIME " << _startTime <<"\n";
     for(std::vector<std::string>::const_iterator it = stat_names.begin();
         it != stat_names.end();
-        it++) {
+        it++)
         printStat(it->c_str(), printbuf, current);
-    }
    printbuf << "END\n";
 }
 
@@ -544,11 +554,11 @@ UFStatCollector::getStatsWithPrefix(cons
     statsMutex.lock(running_user_fiber);
     // Get all stats which start with stat_prefix
     for(std::vector< Stat >::const_iterator it = UFStatSystem::global_stats.begin();
-        it != UFStatSystem::global_stats.end(); it++) {
+        it != UFStatSystem::global_stats.end(); it++) 
+    {
         size_t found = it->name.find(stat_prefix);
-        if(found == 0) {
+        if(!found)
             stat_names.push_back(it->name);
-        }
     }
     statsMutex.unlock(running_user_fiber);
 }

Modified: trafficserver/traffic/branches/UserFiber/core/src/UFStats.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/core/src/UFStats.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/core/src/UFStats.C (original)
+++ trafficserver/traffic/branches/UserFiber/core/src/UFStats.C Fri Aug 13 19:43:09 2010
@@ -1,6 +1,7 @@
-#include "UFStats.H"
-#include "UFStatSystem.H"
+#include <UFStats.H>
+#include <UFStatSystem.H>
 
+uint32_t UFStats::currentConnections;
 uint32_t UFStats::connectionsHandled;
 uint32_t UFStats::txnSuccess;
 uint32_t UFStats::txnFail;
@@ -12,6 +13,7 @@ namespace UFStats
 {
     void registerStats(bool lock_needed)
     {
+        UFStatSystem::registerStat("connections.current", &currentConnections, lock_needed);
         UFStatSystem::registerStat("connections.handled", &connectionsHandled, lock_needed);
         UFStatSystem::registerStat("txn.success", &txnSuccess, lock_needed);
         UFStatSystem::registerStat("txn.fail", &txnFail, lock_needed);

Modified: trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C?rev=985334&r1=985333&r2=985334&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C (original)
+++ trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C Fri Aug 13 19:43:09 2010
@@ -21,14 +21,15 @@
 #include <string>
 #include <stdio.h>
 
-#include "UF.H"
-#include "UFIO.H"
-#include "UFServer.H"
-#include "UFConnectionPool.H"
+#include <ufcore/UF.H>
+#include <ufcore/UFIO.H>
+#include <ufcore/UFServer.H>
+#include <ufcore/UFConnectionPool.H>
 #include <vector>
 
 using namespace std;
 
+unsigned int NUM_REQUESTS_TO_RUN                        = 0;
 unsigned short int NUM_THREADS                          = 1;
 unsigned int NUM_USER_FIBERS_ALLOWED_TO_RUN             = 1;
 unsigned int NUM_CONNECTIONS_PER_FIBER                  = 1;
@@ -79,10 +80,10 @@ ResponseInfoObject overallInfo;
 pthread_mutex_t overallInfoTrackMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_key_t threadUpdateOverallInfo;
 
-int GET_RESPONSE_TIMEOUT = 1*1000*1000;
+TIME_IN_US GET_RESPONSE_TIMEOUT = -1;
 string DOUBLE_NEWLINE = "\r\n\r\n";
 unsigned int DOUBLE_NEWLINE_LENGTH = DOUBLE_NEWLINE.length();
-bool readData(UFIO* ufio, unsigned int timeout, bool& connClosed)
+bool readData(UFIO* ufio, bool& connClosed)
 {
     string result;
     char buf[4096];
@@ -92,7 +93,7 @@ bool readData(UFIO* ufio, unsigned int t
     bool okToExitToEnd = false;
     while(1)
     {
-        int num_bytes_read = ufio->read(buf, 4095, 0);
+        int num_bytes_read = ufio->read(buf, 4095, GET_RESPONSE_TIMEOUT);
         if(num_bytes_read <= 0)
         {
             if(okToExitToEnd && (num_bytes_read == 0))
@@ -101,8 +102,13 @@ bool readData(UFIO* ufio, unsigned int t
                 connClosed = true;
                 return true;
             }
+            cerr<<"bytes read = "<<num_bytes_read<<" with errno = "<<strerror(ufio->getErrno())<<endl;
             return false;
         }
+        /*
+        else
+            cerr<<"read"<<string(buf, num_bytes_read)<<endl;
+            */
         result.append(buf, num_bytes_read);
 
 
@@ -147,6 +153,7 @@ bool readData(UFIO* ufio, unsigned int t
            (result.length() > (endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength))) //dont support pipelining yet
         {
             cerr<<"read more than supposed to"<<endl;
+            cerr<<"read "<<result;
             return false;
         }
         else if(result.length() == endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength)
@@ -158,10 +165,12 @@ bool readData(UFIO* ufio, unsigned int t
 
 
 unsigned int SLEEP_BETWEEN_CONN_SETUP = 0;
-int CONNECT_AND_REQUEST_TIMEOUT = 1*1000*1000;
+TIME_IN_US CONNECT_AND_REQUEST_TIMEOUT = -1;
 bool writeData(UFIO* ufio, const string& data)
 {
     int amt_written = ufio->write(data.data(), data.length(), CONNECT_AND_REQUEST_TIMEOUT);
+    if(amt_written <= 0)
+        cerr<<"write failed "<<ufio->getErrno()<<endl;
     return ((amt_written == (int)data.length()) ? true : false);
 }
 
@@ -175,7 +184,7 @@ UFIO* getConn(ResponseInfoObject* rIO)
 
     struct timeval start,finish;
     gettimeofday(&start, 0);
-    UFIO* ufio = cpool->getConnection(remote_addr);
+    UFIO* ufio = cpool->getConnection(remote_addr, true, CONNECT_AND_REQUEST_TIMEOUT);
     if(!ufio)
     {
         if(random()%100 < 10)
@@ -221,7 +230,7 @@ void run_handler()
         return;
 
     //do the requests
-    unsigned short int num_requests_run = 0;
+    unsigned int num_requests_run = 0;
     while(num_requests_run++ < NUM_REQUESTS_PER_FIBER)
     {
         if(INTER_SEND_SLEEP)
@@ -258,7 +267,7 @@ void run_handler()
 
 
         bool connClosed = false;
-        if(!readData(ufio, CONNECT_AND_REQUEST_TIMEOUT, connClosed))
+        if(!readData(ufio, connClosed))
         {
             if(random()%100 < 10)
                 cerr<<"bailing since read data failed "<<strerror(errno)<<endl;
@@ -300,7 +309,7 @@ void ClientUF::run()
     ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo);
     if(!rIO)
         return;
-    unsigned short int num_requests_run = 0;
+    unsigned int num_requests_run = 0;
     while(num_requests_run++ < NUM_CONNECTIONS_PER_FIBER)
     {
         //wait if told to do so
@@ -405,16 +414,13 @@ void SetupClientUF::run()
         {
             if(!rIO.num_user_fibers_running)
                 break;
-            else if ((THREAD_COMPLETION_PERCENT_TO_BAIL_ON < 100) && 
-                     (rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN <= THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
-                )
-            {
-                cerr<<"bailing due to "<<"num_user_fibers_running = "<<rIO.num_user_fibers_running<<" and div = "<<(rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" and amt to bail on = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl;
-                break;
-            }
 
             UF::gusleep(5000000);
-            cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" ("<<(rIO.num_attempt*100/(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN))<<"%)"<<endl;
+            unsigned short int threadCompletionPercent = (rIO.num_attempt*100)/NUM_REQUESTS_TO_RUN;
+            cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<NUM_REQUESTS_TO_RUN<<" ("<<threadCompletionPercent<<"%)"<<endl;
+
+            if(threadCompletionPercent > THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
+                break;
         }
     }
 
@@ -551,6 +557,7 @@ void print_usage()
 
 int main(int argc, char** argv)
 {
+    unsigned long long int DELAY_BETWEEN_STARTING_THREADS_IN_US = 0;
     if(pthread_key_create(&threadUpdateOverallInfo, 0) != 0)
     {
         cerr<<"couldnt create key for threadUpdateOverallInfo "<<strerror(errno)<<endl;
@@ -560,10 +567,13 @@ int main(int argc, char** argv)
     string rem_port = "80";
     string rem_addr = "127.0.0.1";
     char ch;
-	while ((ch = getopt(argc, argv, "M:Z:U:x:X:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1) 
+	while ((ch = getopt(argc, argv, "M:Z:U:x:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1) 
     {
 		switch (ch) 
         {
+            case'x':
+                DELAY_BETWEEN_STARTING_THREADS_IN_US = atoi(optarg);
+                break;
             case 'Z':
                 sleepShouldBeRandom = atoi(optarg);
                 break;
@@ -609,14 +619,10 @@ int main(int argc, char** argv)
                 HTTP_BASE_REQ_STRING = optarg;
                 break;
             case 'c':
-                CONNECT_AND_REQUEST_TIMEOUT = atoi(optarg)*1000;
-                if(CONNECT_AND_REQUEST_TIMEOUT <= 0)
-                    CONNECT_AND_REQUEST_TIMEOUT = -1;
+                CONNECT_AND_REQUEST_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1;
                 break;
             case 'd':
-                GET_RESPONSE_TIMEOUT = atoi(optarg)*1000;
-                if(GET_RESPONSE_TIMEOUT <= 0)
-                    GET_RESPONSE_TIMEOUT = -1;
+                GET_RESPONSE_TIMEOUT = (atoi(optarg) > 0) ? atoi(optarg)*1000 : -1;
                 break;
             case 's':
                 INTER_SEND_SLEEP = atoi(optarg)*1000;
@@ -642,6 +648,7 @@ int main(int argc, char** argv)
     remote_addr = rem_addr + ":" + rem_port;
     print_info();
 
+    NUM_REQUESTS_TO_RUN = NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN;
 
     //create the threads
     pthread_t* thread = new pthread_t[NUM_THREADS];
@@ -651,6 +658,11 @@ int main(int argc, char** argv)
         list<UF*>* ufList = new list<UF*>();
         ufList->push_back(new SetupClientUF());
         UFIO::ufCreateThreadWithIO(&thread[i], ufList);
+        if(DELAY_BETWEEN_STARTING_THREADS_IN_US)
+        {
+            cerr<<"sleeping for "<<DELAY_BETWEEN_STARTING_THREADS_IN_US<<endl;
+            usleep(DELAY_BETWEEN_STARTING_THREADS_IN_US);
+        }
     }