You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/12 15:00:50 UTC

svn commit: r943474 - in /trafficserver/traffic/branches/UserFiber: ./ include/ lib/ samples/ src/

Author: akundu
Date: Wed May 12 13:00:49 2010
New Revision: 943474

URL: http://svn.apache.org/viewvc?rev=943474&view=rev
Log:
1. added the ConnectionPool code
2. rearranged code location (into include/src/lib format)
3. updated connect code


Added:
    trafficserver/traffic/branches/UserFiber/include/
    trafficserver/traffic/branches/UserFiber/include/UF.H   (contents, props changed)
      - copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UF.H
    trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
    trafficserver/traffic/branches/UserFiber/include/UFIO.H   (contents, props changed)
      - copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFIO.H
    trafficserver/traffic/branches/UserFiber/include/UFServer.H   (contents, props changed)
      - copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFServer.H
    trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H   (props changed)
      - copied unchanged from r943471, trafficserver/traffic/branches/UserFiber/UFStatSystem.H
    trafficserver/traffic/branches/UserFiber/lib/
    trafficserver/traffic/branches/UserFiber/samples/Makefile
    trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
    trafficserver/traffic/branches/UserFiber/src/
    trafficserver/traffic/branches/UserFiber/src/UF.C   (contents, props changed)
      - copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UF.C
    trafficserver/traffic/branches/UserFiber/src/UFIO.C   (contents, props changed)
      - copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFIO.C
    trafficserver/traffic/branches/UserFiber/src/UFServer.C   (contents, props changed)
      - copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFServer.C
    trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C   (contents, props changed)
      - copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFStatSystem.C
Removed:
    trafficserver/traffic/branches/UserFiber/UF.C
    trafficserver/traffic/branches/UserFiber/UF.H
    trafficserver/traffic/branches/UserFiber/UFIO.C
    trafficserver/traffic/branches/UserFiber/UFIO.H
    trafficserver/traffic/branches/UserFiber/UFServer.C
    trafficserver/traffic/branches/UserFiber/UFServer.H
    trafficserver/traffic/branches/UserFiber/UFStatSystem.C
    trafficserver/traffic/branches/UserFiber/UFStatSystem.H
Modified:
    trafficserver/traffic/branches/UserFiber/Makefile
    trafficserver/traffic/branches/UserFiber/samples/echoServer.C

Modified: trafficserver/traffic/branches/UserFiber/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/Makefile?rev=943474&r1=943473&r2=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/Makefile Wed May 12 13:00:49 2010
@@ -4,23 +4,11 @@ ARCH=x86-64
 
 all:	ufHTTPServer
 
-UF.o:	UF.C UF.H
-	$(CPP) $(BUILD_FLAGS) -c -o UF.o UF.C -march=$(ARCH) 
-
-UFIO.o:	UFIO.C UFIO.H
-	$(CPP) $(BUILD_FLAGS) -c -o UFIO.o UFIO.C -march=$(ARCH)
-
-UFStatSystem.o: UFStatSystem.C UFStatSystem.H
-	$(CPP) $(BUILD_FLAGS) -c -o UFStatSystem.o UFStatSystem.C -march=$(ARCH)
-
-UFServer.o: UFServer.C UFServer.H
-	$(CPP) $(BUILD_FLAGS) -c -o UFServer.o UFServer.C -march=$(ARCH)
-
-ufHTTPServer.o:	ufHTTPServer.C UF.o UFIO.o UFStatSystem.o UFServer.o
-	$(CPP) $(BUILD_FLAGS) -c -o ufHTTPServer.o ufHTTPServer.C -march=$(ARCH)
+ufHTTPServer.o:	ufHTTPServer.C
+	$(CPP) $(BUILD_FLAGS) -c -I./include -o ufHTTPServer.o ufHTTPServer.C -march=$(ARCH)
 
 ufHTTPServer:	ufHTTPServer.o
-	$(CPP) $(BUILD_FLAGS) -o ufHTTPServer UF.o UFIO.o UFStatSystem.o UFServer.o ufHTTPServer.o -lpthread -march=$(ARCH)
+	$(CPP) $(BUILD_FLAGS) -o ufHTTPServer ufHTTPServer.o -L./lib -lUF -lpthread -march=$(ARCH)
 
 clean: 
 	rm *.o ufHTTPServer

Copied: trafficserver/traffic/branches/UserFiber/include/UF.H (from r943471, trafficserver/traffic/branches/UserFiber/UF.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UF.H?p2=trafficserver/traffic/branches/UserFiber/include/UF.H&p1=trafficserver/traffic/branches/UserFiber/UF.H&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UF.H Wed May 12 13:00:49 2010
@@ -50,6 +50,7 @@ struct UF
     void                 yield();
     ///must be called after the fiber is added to a scheduler
     void                 usleep(unsigned long long int sleepAmtInUs);
+    static void          gusleep(unsigned long long int sleepAmtInUs);
     ///simply block the fiber
     void                 block();
 
@@ -168,18 +169,21 @@ struct UFScheduler
     const ucontext_t& getMainContext() const;
     void setSpecific(void* args);
     void* getSpecific() const;
-    void setExit(bool exit = true);
+    static void setExit(bool exit = true);
+    void setExitJustMe(bool exit = true);
 
     ///the variable that says whether the scheduler should be handling the sleep or
     //if its handled w/in the UserFabrics
     void*                       (*_notifyFunc)(void*);
     void*                       _notifyArgs;
 
-
     //to allow to identify the thread running now
     static pthread_key_t        _specific_key;
 
+    static void ufCreateThread(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+
     static bool                 _exit;
+    bool                        _exitJustMe;
 protected:
     UF*                         _currentFiber;
     ucontext_t                  _mainContext;
@@ -214,11 +218,17 @@ inline const ucontext_t& UFScheduler::ge
 inline void UFScheduler::setSpecific(void* args) { _specific = args; }
 inline void* UFScheduler::getSpecific() const { return _specific; }
 inline void UFScheduler::setExit(bool exit) { _exit = exit; }
+inline void UFScheduler::setExitJustMe(bool exit) { _exitJustMe = exit; }
 
 inline UFScheduler* UF::getParentScheduler() const { return _parentScheduler; }
 
 inline void UF::waitOnLock() { block(); }
 
+inline void UF::gusleep(unsigned long long int sleepAmtInUs)
+{
+    UFScheduler::getUFScheduler()->getRunningFiberOnThisThread()->usleep(sleepAmtInUs);
+}
+
 inline void UF::usleep(unsigned long long int sleepAmtInUs)
 {
     if(!sleepAmtInUs)

Propchange: trafficserver/traffic/branches/UserFiber/include/UF.H
------------------------------------------------------------------------------
    svn:mergeinfo = 

Added: trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H?rev=943474&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H (added)
+++ trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H Wed May 12 13:00:49 2010
@@ -0,0 +1,91 @@
+#ifndef UFCONNECTIONPOOL_H
+#define UFCONNECTIONPOOL_H
+
+#include <time.h>
+
+#include <string>
+#include <vector>
+#include <map>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+
+class StIpInfo;
+class UFConnectionGroupInfo;
+class UFIO;
+class UFConnectionPoolImpl;
+class UFConnectionPool
+{
+public:
+    UFConnectionPool();
+
+    // Call init before thread creation
+    static void init();
+    
+    unsigned int loadConfigFile(const std::string& fileName);
+    unsigned int loadConfigFile(const std::string& fileName, int maxSimultaneousConns);
+
+    bool addGroup(UFConnectionGroupInfo* stGroupInfo);
+    UFConnectionGroupInfo* removeGroup(const std::string& name);
+    UFIO* getConnection(const std::string& groupName);
+    UFIO* getConnection(const std::string& groupName, bool waitForConnection);
+    void releaseConnection(UFIO* ufIO, bool connOk = true);
+    void setTimeoutIP(int timeoutIn); ///how long to timeout an ip that we cant connect to (is not responding)
+    std::string fillInfo(std::string& data, bool detailed = false) const;
+    double getGroupAvailability(const std::string& name) const;
+
+    //TODO - get stats & detailed stats info
+protected:
+
+    UFConnectionPoolImpl*           impl;
+
+};
+
+namespace StringUtil
+{
+    typedef std::vector<std::string> StringVector;
+    typedef std::vector<std::string>::iterator StringVectorItr;
+    unsigned int split(const std::string& input, const std::string& splitOn, StringVector& output);
+    std::string trim_ws(const std::string& input);
+}
+
+struct UFIO;
+typedef std::map<UFIO*, int> UFIOIntMap;
+struct UFConnectionIpInfo
+{
+    UFConnectionIpInfo(const std::string& ip, bool persistent = true, int maxSimultaneousConns = -1, int _timeOutPerTransaction = 90);
+    ~UFConnectionIpInfo() {} //TODO: remove all the conns w/in currently available and currently used
+    std::string         _ip;
+    struct sockaddr_in  _sin;
+    int                 _maxSimultaneousConns;
+    bool                _persistent; 
+
+    int                 _timeOutPerTransaction; ///how many ms to try before considering the connect a failure
+    int                 _timedOut;
+
+    UFIOIntMap          _currentlyAvailableConnections;
+    UFIOIntMap          _currentlyUsedConnections;
+    unsigned int        _inProcessCount;
+};
+
+
+typedef std::vector<UFConnectionIpInfo*> UFConnectionIpInfoList;
+struct UFConnectionGroupInfo
+{
+    UFConnectionGroupInfo(const std::string& name);
+    ~UFConnectionGroupInfo();
+
+    bool addIP(UFConnectionIpInfo* UFConnectionIpInfo); 
+    UFConnectionIpInfo* removeIP(const std::string& ip);
+    double getAvailability() const ;
+
+    std::string         _name;
+
+    UFConnectionIpInfoList          _ipInfoList;
+};
+
+
+#endif

Copied: trafficserver/traffic/branches/UserFiber/include/UFIO.H (from r943471, trafficserver/traffic/branches/UserFiber/UFIO.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFIO.H?p2=trafficserver/traffic/branches/UserFiber/include/UFIO.H&p1=trafficserver/traffic/branches/UserFiber/UFIO.H&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFIO.H Wed May 12 13:00:49 2010
@@ -112,6 +112,9 @@ struct UFIO
     void setUFIOScheduler(UFIOScheduler* ufios);
 
     UFSleepInfo*                _sleepInfo;
+
+    static void ufCreateThreadWithIO(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+
 protected:
     int                         _fd;
     unsigned int                _errno;
@@ -221,4 +224,17 @@ inline void EpollUFIOScheduler::releaseS
     _availableSleepInfo.push_back(&ufsi);
 }
 
+struct IORunner : public UF
+{
+    void run();
+    IORunner(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    UF* createUF() { return new IORunner(); }
+    static IORunner* _self;
+    static int _myLoc;
+};
+
 #endif

Propchange: trafficserver/traffic/branches/UserFiber/include/UFIO.H
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: trafficserver/traffic/branches/UserFiber/include/UFServer.H (from r943471, trafficserver/traffic/branches/UserFiber/UFServer.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFServer.H?p2=trafficserver/traffic/branches/UserFiber/include/UFServer.H&p1=trafficserver/traffic/branches/UserFiber/UFServer.H&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFServer.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFServer.H Wed May 12 13:00:49 2010
@@ -102,17 +102,4 @@ inline std::pair<UFScheduler*, pthread_t
     return _threadList[lastLocUsed++%(_threadList.size())];
 }
 
-struct IORunner : public UF
-{
-    void run();
-    IORunner(bool registerMe = false)
-    {
-        if(registerMe)
-            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
-    }
-    UF* createUF() { return new IORunner(); }
-    static IORunner* _self;
-    static int _myLoc;
-};
-
 #endif

Propchange: trafficserver/traffic/branches/UserFiber/include/UFServer.H
------------------------------------------------------------------------------
    svn:mergeinfo = 

Propchange: trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H
------------------------------------------------------------------------------
    svn:mergeinfo = 

Added: trafficserver/traffic/branches/UserFiber/samples/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/Makefile?rev=943474&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/Makefile (added)
+++ trafficserver/traffic/branches/UserFiber/samples/Makefile Wed May 12 13:00:49 2010
@@ -0,0 +1,22 @@
+CPP=c++
+BUILD_FLAGS=-g -O3 -Wall -DPIPE_NOT_EFD -Wno-deprecated
+ARCH=x86-64
+INCLUDE=-I../include
+INCLUDE_DIR=../include
+
+all:	UFHTTPLoader echoServer
+
+UFHTTPLoader.o:	UFHTTPLoader.C
+	$(CPP) $(BUILD_FLAGS) -c -o UFHTTPLoader.o UFHTTPLoader.C $(INCLUDE) -march=$(ARCH)
+
+UFHTTPLoader:	UFHTTPLoader.o
+	$(CPP) $(BUILD_FLAGS) -o UFHTTPLoader UFHTTPLoader.o -L../lib/ -lUF -lpthread -march=$(ARCH)
+
+echoServer.o:	echoServer.C
+	$(CPP) $(BUILD_FLAGS) -c -o echoServer.o echoServer.C $(INCLUDE) -march=$(ARCH)
+
+echoServer:	echoServer.o
+	$(CPP) $(BUILD_FLAGS) -o echoServer echoServer.o -L../lib/ -lUF -lpthread -march=$(ARCH)
+
+clean: 
+	rm *.o UFHTTPLoader

Added: trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C?rev=943474&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C (added)
+++ trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C Wed May 12 13:00:49 2010
@@ -0,0 +1,711 @@
+#include <algorithm>
+#include <errno.h>
+#include <vector>
+#include <time.h>
+#include <sstream>
+#include <iostream>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <string>
+
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
+
+using namespace std;
+
+unsigned short int NUM_THREADS                          = 1;
+unsigned int NUM_USER_FIBERS_ALLOWED_TO_RUN             = 1;
+unsigned int NUM_CONNECTIONS_PER_FIBER                  = 1;
+unsigned int NUM_REQUESTS_PER_FIBER                     = 1;
+unsigned int RATE_TO_CREATE_FIBERS                      = 0;
+int RATE_TO_CREATE_FIBERS_TIME                          = -1;
+unsigned short int THREAD_COMPLETION_PERCENT_TO_BAIL_ON = 100;
+
+struct ResponseInfoObject
+{
+    ResponseInfoObject()
+    {
+        total_time                  = 0;
+        connect_time                = 0;
+        num_attempt                 = 0;
+        num_success                 = 0;
+        total_success_time          = 0;
+
+        thread_create_error         = 0;
+        socket_creation_error       = 0;
+        error_response              = 0;
+        invalid_response            = 0;
+        connect_error               = 0;
+        connect_success             = 0;
+        write_error                 = 0;
+        read_error                  = 0;
+        num_user_fibers_running     = 0;
+    }
+
+    unsigned long long int total_time;
+    unsigned long long int connect_time;
+    unsigned int num_attempt;
+    unsigned int num_success;
+    unsigned long long int total_success_time;
+
+    unsigned int thread_create_error;
+    unsigned int socket_creation_error;
+    unsigned int error_response;
+    unsigned int invalid_response;
+    unsigned int connect_success;
+    unsigned int connect_error;
+    unsigned int write_error;
+    unsigned int read_error;
+    unsigned int num_user_fibers_running;
+    vector <unsigned long long int> results;
+};
+ResponseInfoObject overallInfo;
+pthread_mutex_t overallInfoTrackMutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_key_t threadUpdateOverallInfo;
+struct sockaddr_in rmt_addr;
+
+int GET_RESPONSE_TIMEOUT = 1*1000*1000;
+string DOUBLE_NEWLINE = "\r\n\r\n";
+unsigned int DOUBLE_NEWLINE_LENGTH = DOUBLE_NEWLINE.length();
+bool readData(UFIO* ufio, unsigned int timeout, bool& connClosed)
+{
+    string result;
+    char buf[4096];
+    unsigned int searchStartPos;
+    size_t endOfHeaders = string::npos;
+    unsigned int contentLength = 0;
+    bool okToExitToEnd = false;
+    while(1)
+    {
+        int num_bytes_read = ufio->read(buf, 4095, 0);
+        if(num_bytes_read <= 0)
+        {
+            if(okToExitToEnd && (num_bytes_read == 0))
+            {
+                connClosed = true;
+                return true;
+            }
+            return false;
+        }
+        result.append(buf, num_bytes_read);
+
+
+
+        if(endOfHeaders == string::npos)
+        {
+            if(result.length() > (num_bytes_read + DOUBLE_NEWLINE.length()))
+               searchStartPos = result.length() - (num_bytes_read + DOUBLE_NEWLINE.length());
+            else
+                searchStartPos = 0;
+            endOfHeaders = result.find(DOUBLE_NEWLINE, searchStartPos);
+            if(endOfHeaders != string::npos)
+            {
+                //search for the content length;
+                bool foundContentLength = false;
+                const char* indexOfCL = strstr(result.c_str(), "Content-Length: ");
+                if(indexOfCL)
+                {
+                    sscanf(indexOfCL, "Content-Length: %d", &contentLength);
+                    if(!contentLength)
+                        cerr<<"found content length but not bytes = "<<result.c_str()+(indexOfCL-result.data())<<endl;
+                    else
+                        foundContentLength = true;
+                }
+                else
+                {
+                    indexOfCL = strstr(result.c_str(), "Content-Length:");
+                    if(indexOfCL)
+                    {
+                        sscanf(indexOfCL, "Content-Length:%d", &contentLength);
+                        if(!contentLength)
+                            cerr<<"found content length but not bytes = "<<result.c_str()+(indexOfCL-result.data())<<endl;
+                        else
+                            foundContentLength = true;
+                    }
+                }
+                if(!foundContentLength)
+                    okToExitToEnd = true;
+            }
+        }
+        if(!okToExitToEnd &&
+           (result.length() > (endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength))) //dont support pipelining yet
+        {
+            cerr<<"read more than supposed to"<<endl;
+            return false;
+        }
+        else if(result.length() == endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength)
+            return true;
+    }
+
+    return false;
+}
+
+
+unsigned int SLEEP_BETWEEN_CONN_SETUP = 0;
+int CONNECT_AND_REQUEST_TIMEOUT = 1*1000*1000;
+bool writeData(UFIO* ufio, const string& data)
+{
+    int amt_written = ufio->write(data.data(), data.length(), CONNECT_AND_REQUEST_TIMEOUT);
+    return ((amt_written == (int)data.length()) ? true : false);
+}
+
+int sockBuf = 0;
+UFIO* getConn(ResponseInfoObject* rIO)
+{
+    //create the socket to build the connection on
+    UFIO* ufio = new UFIO(UFScheduler::getUFScheduler()->getRunningFiberOnThisThread());
+    if(!ufio)
+        return 0;
+
+    struct timeval start,finish;
+    gettimeofday(&start, 0);
+    if(ufio->connect((struct sockaddr*)&rmt_addr, sizeof(rmt_addr), CONNECT_AND_REQUEST_TIMEOUT) != 0)
+    {
+        if(random()%100 < 10)
+            cerr<<"connect error: "<<strerror(errno)<<endl;
+        rIO->connect_error++;
+        return 0;
+    }
+    gettimeofday(&finish, 0);
+    rIO->connect_success++;
+    rIO->connect_time += ((finish.tv_sec - start.tv_sec) * 1000000) + (finish.tv_usec - start.tv_usec);
+
+
+    if(sockBuf)
+    {
+        setsockopt(ufio->getFd(), SOL_SOCKET, SO_RCVBUF, (char*) &sockBuf, sizeof( sockBuf ));
+        setsockopt(ufio->getFd(), SOL_SOCKET, SO_SNDBUF, (char*) &sockBuf, sizeof( sockBuf ));
+    }
+
+    return ufio;
+}
+
+string host_header = "";
+bool GENERATE_RANDOM_STRING = false;
+string HTTP_BASE_REQ_STRING = "/index.html";
+string MSG_STRING = "";
+unsigned int INTER_SEND_SLEEP = 0;
+unsigned int global_counter = 0;
+
+
+
+unsigned long long int GENERATE_RANDOM_STRING_CONSTRAINT = 2000000000;
+bool sleepShouldBeRandom = true;
+void run_handler()
+{
+    ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo);
+    if(!rIO)
+        return;
+
+    //create the socket to build the connection on
+    struct timeval start,finish;
+    UFIO* ufio = getConn(rIO);
+    if(!ufio)
+        return;
+
+    //do the requests
+    unsigned short int num_requests_run = 0;
+    while(num_requests_run++ < NUM_REQUESTS_PER_FIBER)
+    {
+        if(INTER_SEND_SLEEP)
+        {
+            if(sleepShouldBeRandom)
+                UF::gusleep((random()%INTER_SEND_SLEEP)*1000);
+            else
+                UF::gusleep(INTER_SEND_SLEEP*1000);
+        }
+
+        rIO->num_attempt++;
+
+        if(GENERATE_RANDOM_STRING)
+        {
+            stringstream ss;
+            ss<<"GET "<<"/test"<<random()%10<<"/test"<<random()%10<<"/index.html/"<<(random()%GENERATE_RANDOM_STRING_CONSTRAINT)<<" HTTP/1.0\r\nHost: "<<host_header<<"\r\nConnection: Keep-Alive\r\n\r\n";
+            gettimeofday(&start, 0);
+            if(!writeData(ufio, ss.str()))
+                goto run_handler_done;
+        }
+        else
+        {
+            if(!MSG_STRING.length())
+                MSG_STRING = "GET " + HTTP_BASE_REQ_STRING + " HTTP/1.0\r\nHost: " + host_header + "\r\nConnection: Keep-Alive\r\n\r\n";
+            gettimeofday(&start, 0);
+            if(!writeData(ufio, MSG_STRING))
+            {
+                if(random()%100 < 10)
+                    cerr<<"error on write = "<<strerror(errno)<<endl;
+                rIO->write_error++;
+                goto run_handler_done;
+            }
+        }
+
+
+        bool connClosed = false;
+        if(!readData(ufio, CONNECT_AND_REQUEST_TIMEOUT, connClosed))
+        {
+            if(random()%100 < 10)
+                cerr<<"bailing since read data failed "<<strerror(errno)<<endl;
+            rIO->read_error++;
+            goto run_handler_done;
+        }
+
+        gettimeofday(&finish, 0);
+        //save the diff_time
+        unsigned long long int diff_time = ((finish.tv_sec - start.tv_sec) * 1000000) + (finish.tv_usec - start.tv_usec);
+        rIO->results.push_back(diff_time);
+        rIO->total_success_time += diff_time;
+
+
+        rIO->num_success++;
+
+        if(connClosed)
+        {
+            delete ufio;
+            ufio = getConn(rIO);
+            if(!ufio)
+                return;
+        }
+    }
+
+run_handler_done:
+    delete ufio;
+}
+
+
+
+void read_address(const char *str, struct sockaddr_in *sin)
+{
+    char host[128], *p;
+    struct hostent *hp;
+    short port;
+
+    strcpy(host, str);
+    if ((p = strchr(host, ':')) == NULL)
+    {
+        cerr<<"invalid host: "<<host<<endl;
+        exit(1);
+    }
+    *p++ = '\0';
+    port = (short) atoi(p);
+    if (port < 1)
+    {
+
+        cerr<<"invalid port: "<<port<<endl;
+        exit(1);
+    }
+
+    memset(sin, 0, sizeof(struct sockaddr_in));
+    sin->sin_family = AF_INET;
+    sin->sin_port = htons(port);
+    if (host[0] == '\0')
+    {
+        sin->sin_addr.s_addr = INADDR_ANY;
+        return;
+    }
+    sin->sin_addr.s_addr = inet_addr(host);
+    if (sin->sin_addr.s_addr == INADDR_NONE)
+    {
+        /* not dotted-decimal */
+        if ((hp = gethostbyname(host)) == NULL)
+        {
+            cerr<<"cant resolve address "<<host<<endl;
+            exit(1);
+        }
+        memcpy(&sin->sin_addr, hp->h_addr, hp->h_length);
+    }
+}
+
+struct ClientUF : public UF
+{
+    void run();
+    UF* createUF() { return new ClientUF(); }
+};
+
+void ClientUF::run()
+{
+    ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo);
+    if(!rIO)
+        return;
+    unsigned short int num_requests_run = 0;
+    while(num_requests_run++ < NUM_CONNECTIONS_PER_FIBER)
+    {
+        //wait if told to do so
+        if(SLEEP_BETWEEN_CONN_SETUP)
+        {
+            if(sleepShouldBeRandom)
+                UF::gusleep((random()%SLEEP_BETWEEN_CONN_SETUP)*1000);
+            else
+                UF::gusleep(SLEEP_BETWEEN_CONN_SETUP*1000);
+        }
+
+        //now start sending the data
+        run_handler();
+    }
+
+    rIO->num_user_fibers_running--;
+    return;
+}
+
+struct SetupClientUF : public UF
+{
+    void run();
+    UF* createUF() { return new SetupClientUF(); }
+};
+
+void SetupClientUF::run()
+{
+    ResponseInfoObject rIO;
+    pthread_setspecific(threadUpdateOverallInfo, &rIO);
+
+    UFScheduler* ufs = UFScheduler::getUFScheduler();
+    if(!ufs)
+        return;
+
+    unsigned short int num_contiguous_thread_creation_failures = 0;
+    struct timeval start,finish;
+    gettimeofday(&start, 0);
+
+
+    while(rIO.num_user_fibers_running < NUM_USER_FIBERS_ALLOWED_TO_RUN)
+    {
+        if (!ufs->addFiberToScheduler(new ClientUF()))
+        {
+            rIO.thread_create_error++;
+            cerr<<"thread_create error with errno = "<<strerror(errno)<<endl;
+                                                  //only 5 contiguous failues allowed at a time
+            if (num_contiguous_thread_creation_failures++ == 5)
+            {
+                cerr<<"had "<<num_contiguous_thread_creation_failures<<" failures in creating threads - so bailing"<<endl;
+                break;
+            }
+        }
+        else
+        {
+            rIO.num_user_fibers_running++;
+            num_contiguous_thread_creation_failures = 0;
+        }
+    }
+
+    //create the RATE_TO_CREATE_FIBERS based threads
+    if(RATE_TO_CREATE_FIBERS)
+    {
+        time_t start_time = time(0);
+        num_contiguous_thread_creation_failures = 0;
+        bool breakFromRateCreationThreads = false;
+        unsigned int totalCountRate = 0;
+        while(!breakFromRateCreationThreads)
+        {
+            struct timeval start_rate,finish_rate;
+            gettimeofday(&start_rate, 0);
+            cerr<<time(0)<<": "<<getpid()<<" opening "<<RATE_TO_CREATE_FIBERS<<" connections with current count = "<<totalCountRate<<" and num active threads = "<<rIO.num_user_fibers_running<<endl;
+            for(unsigned int i = 0; i < RATE_TO_CREATE_FIBERS; ++i)
+            {
+                if (!ufs->addFiberToScheduler(new ClientUF()))
+                {
+                    rIO.thread_create_error++;
+                    cerr<<"thread_create error with errno = "<<strerror(errno)<<endl;
+                                                      //only 5 contiguous failues allowed at a time
+                    if (num_contiguous_thread_creation_failures++ == 5)
+                    {
+                        cerr<<"had "<<num_contiguous_thread_creation_failures<<" failures in creating threads - so bailing"<<endl;
+                        breakFromRateCreationThreads = true;
+                        break;
+                    }
+                }
+                else
+                    rIO.num_user_fibers_running++;
+
+                UF::gusleep((int)(1000000/RATE_TO_CREATE_FIBERS));
+
+                totalCountRate++;
+            }
+            gettimeofday(&finish_rate, 0);
+
+            if((RATE_TO_CREATE_FIBERS_TIME != -1) && (RATE_TO_CREATE_FIBERS_TIME + start_time) < time(0))
+                break;
+        }
+    }
+    else
+    {
+        while(1)
+        {
+            if(!rIO.num_user_fibers_running)
+                break;
+            else if ((THREAD_COMPLETION_PERCENT_TO_BAIL_ON < 100) && 
+                     (rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN <= THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
+                )
+            {
+                cerr<<"bailing due to "<<"num_user_fibers_running = "<<rIO.num_user_fibers_running<<" and div = "<<(rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" and amt to bail on = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl;
+                break;
+            }
+
+            UF::gusleep(5000000);
+            cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" ("<<(rIO.num_attempt*100/(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN))<<"%)"<<endl;
+        }
+    }
+
+
+    gettimeofday(&finish, 0);
+    rIO.total_time += (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec-start.tv_usec);
+
+
+    //TODO:add the total info into the total pool
+    pthread_mutex_lock(&overallInfoTrackMutex);
+    if(rIO.total_time > overallInfo.total_time)
+        overallInfo.total_time = rIO.total_time;
+    overallInfo.connect_time += rIO.connect_time;
+    overallInfo.num_attempt += rIO.num_attempt;
+    overallInfo.num_success += rIO.num_success;
+    overallInfo.total_success_time += rIO.total_success_time;
+    overallInfo.thread_create_error += rIO.thread_create_error;
+    overallInfo.socket_creation_error += rIO.socket_creation_error;
+    overallInfo.error_response += rIO.error_response;
+    overallInfo.invalid_response += rIO.invalid_response;
+    overallInfo.connect_error += rIO.connect_error;
+    overallInfo.connect_success += rIO.connect_success;
+    overallInfo.write_error += rIO.write_error;
+    overallInfo.read_error += rIO.read_error;
+    overallInfo.num_user_fibers_running += rIO.num_user_fibers_running;
+    for(unsigned int i =0; i < rIO.results.size(); ++i)
+        overallInfo.results.push_back(rIO.results[i]);
+    pthread_mutex_unlock(&overallInfoTrackMutex);
+
+
+    ufs->setExitJustMe(true);
+}
+
+void printResults()
+{
+    unsigned int num_fail = overallInfo.read_error+overallInfo.write_error+overallInfo.connect_error+overallInfo.error_response+overallInfo.invalid_response;
+    unsigned int total_should_run = NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN*NUM_THREADS;
+    double avg_req_sec = (overallInfo.total_time) ? overallInfo.num_success/((double)overallInfo.total_time/1000000) : 0;
+    double avg_tm_suc_req = (overallInfo.num_success) ? (double) (overallInfo.total_success_time) / (double) overallInfo.num_success : 0;
+    double avg_suc_req_sec = (overallInfo.total_success_time) ? overallInfo.num_success/((double)overallInfo.total_success_time/1000000) : 0;
+
+
+    float estimatedSuccess = (total_should_run) ? (overallInfo.num_success*100/total_should_run) : 0;
+    float successPercent = (overallInfo.num_success+num_fail) ? (overallInfo.num_success*100/(overallInfo.num_success+num_fail)) : 0;
+    cout<<"estimated success % = "<<estimatedSuccess
+        <<" , real success % = "<<successPercent
+        <<" , avg tm/suc req = "<<avg_tm_suc_req<<" us"
+        <<" , avg succ req/sec = "<< avg_suc_req_sec
+        <<" , avg req/sec = "<< avg_req_sec
+        <<" , tot time = " <<overallInfo.total_time<<" us"
+        <<" , tot shd run = "<<total_should_run
+        <<" , num att = "<<overallInfo.num_attempt
+        <<" , suc cnt = "<<overallInfo.num_success
+        <<" , estimated fail cnt = "<<total_should_run-overallInfo.num_success
+        <<" , real fail cnt = "<<num_fail
+        <<" , thrd_creat_err = "<<overallInfo.thread_create_error
+        <<" , sokt_creat_err = "<<overallInfo.socket_creation_error
+        <<" , connect_error = "<<overallInfo.connect_error
+        <<" , err_resp = "<<overallInfo.error_response
+        <<" , invalid_resp = "<<overallInfo.invalid_response
+        <<" , read_err = "<<overallInfo.read_error
+        <<" , write_err = "<<overallInfo.write_error<<endl;
+
+    if(!overallInfo.results.size())
+        return;
+
+    sort (overallInfo.results.begin(), overallInfo.results.end());
+    unsigned int lastDump = 0;
+    unsigned currLocation = 0;
+    int counter = 0;
+    cout<<"percentile breakdown w/ size = "<<overallInfo.results.size()<<endl;
+    unsigned int lastValue = *(overallInfo.results.begin());
+    for (vector<unsigned long long int>::iterator it=overallInfo.results.begin();
+         it!=overallInfo.results.end(); 
+         ++it)
+    {
+        currLocation = 100*counter++/overallInfo.results.size();
+        if(lastDump == currLocation)
+            continue;
+        if((currLocation % 10 ) == 0 || (currLocation) >= 95)
+        {
+            cout<<" "<<currLocation<<"%"<<" <= "<< *it<<"us"<<endl;
+            lastDump = currLocation;
+        }
+        lastValue = *it;
+    }
+    cout<<"100%"<<" <= "<< lastValue <<"us"<<endl;
+
+    cout<<"min = "<<*(overallInfo.results.begin())<<"us"<<endl;
+    cout<<"max = "<<lastValue<<"us"<<endl;
+}
+
+string remote_addr = "";
+void print_info()
+{
+    cerr<<"connecting to               = "<<remote_addr<<endl;
+    cerr<<"NUM_USER_FIBERS_ALLOWED_TO_RUN  = "<<NUM_USER_FIBERS_ALLOWED_TO_RUN<<endl;
+    cerr<<"NUM_CONNECTIONS_PER_FIBER  = "<<NUM_CONNECTIONS_PER_FIBER<<endl;
+    cerr<<"NUM_REQUESTS_PER_FIBER = "<<NUM_REQUESTS_PER_FIBER<<endl;
+    cerr<<"CONNECT_AND_REQUEST_TIMEOUT = "<<CONNECT_AND_REQUEST_TIMEOUT<<endl;
+    cerr<<"GET_RESPONSE_TIMEOUT        = "<<GET_RESPONSE_TIMEOUT<<endl;
+    cerr<<"INTER_SEND_SLEEP            = "<<INTER_SEND_SLEEP<<endl;
+    cerr<<"SLEEP_BETWEEN_CONN_SETUP    = "<<SLEEP_BETWEEN_CONN_SETUP<<endl;
+    cerr<<"THREAD_COMPLETION_PERCENT_TO_BAIL_ON = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl;
+
+}
+
+void print_usage()
+{
+    cerr<<"StHTTPLoader: "<<endl
+        <<"\t[-f <num_threads>]"<<endl
+        <<"\t[-H <host>]"<<endl
+        <<"\t[-o <host header>]"<<endl
+        <<"\t[-P <port to connect to> (80)]"<<endl
+        <<"\t[-a <rate_to_create_conns/sec>]"<<endl
+        <<"\t[-A <how long to run rate request [-1]"<<endl
+        <<"\t[-t <num_user_threads_to_start>]"<<endl
+        <<"\t[-C <NUM_CONNECTIONS_PER_FIBER>]"<<endl
+        <<"\t[-R <NUM_REQUESTS_PER_FIBER>]"<<endl
+        <<"\t[-c <CONNECT_AND_REQUEST_TIMEOUT in ms (1000)]"<<endl
+        <<"\t[-d <GET_RESPONSE_TIMEOUT in ms        (1000)>]"<<endl
+        <<"\t[-s INTER_SEND_SLEEP (in msec)]"<<endl
+        <<"\t[-r request string (get ip:1.2.3.4)]"<<endl
+        <<"\t[-e END STRING (END)]"<<endl
+        <<"\t[-S SLEEP_BETWEEN_CONN_SETUP (in msec)]"<<endl
+        <<"\t[-Z set the sleep to be random"<<endl
+        <<"\t[-b thread completion percent at which to bail"<<endl
+        <<"\t[-m generate random url"<<endl
+        <<"\t[-M constraint on the number of urls to generate"<<endl
+        <<"\t[-U sock buf sizes for snd + recv [default = 0/none]"<<endl
+        <<"\t[-? this help screen]"<<endl
+        <<"\t[-h this help screen]"<<endl;
+    exit(0);
+}
+
+int main(int argc, char** argv)
+{
+    if(pthread_key_create(&threadUpdateOverallInfo, 0) != 0)
+    {
+        cerr<<"couldnt create key for threadUpdateOverallInfo "<<strerror(errno)<<endl;
+        return 1;
+    }
+
+    string rem_port = "80";
+    string rem_addr = "127.0.0.1";
+    char ch;
+	while ((ch = getopt(argc, argv, "M:Z:U:x:X:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1) 
+    {
+		switch (ch) 
+        {
+            case 'Z':
+                sleepShouldBeRandom = atoi(optarg);
+                break;
+            case 'm':
+                GENERATE_RANDOM_STRING = atoi(optarg);
+                break;
+            case 'M':
+                GENERATE_RANDOM_STRING_CONSTRAINT = atoi(optarg);
+                break;
+            case 'o':
+                host_header = optarg;
+                break;
+            case 'P':
+                rem_port = optarg;
+                break;
+		    case 'H':
+                rem_addr = optarg;
+			    break;
+		    case 't':
+			    NUM_USER_FIBERS_ALLOWED_TO_RUN = atoi(optarg);
+			    break;
+            case 'a':
+                RATE_TO_CREATE_FIBERS = atoi(optarg);
+                break;
+            case 'A':
+                RATE_TO_CREATE_FIBERS_TIME = atoi(optarg);
+                break;
+		    case 'C':
+                NUM_CONNECTIONS_PER_FIBER = atoi(optarg);
+			    break;
+		    case 'R':
+                NUM_REQUESTS_PER_FIBER = atoi(optarg);
+			    break;
+		    case 'f':
+			    NUM_THREADS = atoi(optarg);
+			    break;
+            case 'S':
+                SLEEP_BETWEEN_CONN_SETUP = atoi(optarg)*1000;
+                if(SLEEP_BETWEEN_CONN_SETUP < 0)
+                    SLEEP_BETWEEN_CONN_SETUP = 0;
+                break;
+            case 'r':
+                HTTP_BASE_REQ_STRING = optarg;
+                break;
+            case 'c':
+                CONNECT_AND_REQUEST_TIMEOUT = atoi(optarg)*1000;
+                if(CONNECT_AND_REQUEST_TIMEOUT <= 0)
+                    CONNECT_AND_REQUEST_TIMEOUT = -1;
+                break;
+            case 'd':
+                GET_RESPONSE_TIMEOUT = atoi(optarg)*1000;
+                if(GET_RESPONSE_TIMEOUT <= 0)
+                    GET_RESPONSE_TIMEOUT = -1;
+                break;
+            case 's':
+                INTER_SEND_SLEEP = atoi(optarg)*1000;
+                break;
+		    case 'b':
+                THREAD_COMPLETION_PERCENT_TO_BAIL_ON = atoi(optarg);
+                break;
+		    case 'U':
+                sockBuf = atoi(optarg);
+                break;
+		    case 'h':
+		    case '?':
+		    default:
+			    print_usage();
+			    break;
+		}
+	}
+
+    if(!host_header.length())
+        host_header = rem_addr;
+
+
+    remote_addr = rem_addr + ":" + rem_port;
+    print_info();
+
+    read_address(remote_addr.c_str(), &rmt_addr);
+    if (rmt_addr.sin_addr.s_addr == INADDR_ANY)
+    {
+        cerr<<"invalid remote address: "<<remote_addr<<endl;
+        exit(1);
+    }
+
+
+
+    //create the threads
+    pthread_t* thread = new pthread_t[NUM_THREADS];
+    unsigned int i = 0;
+    for(; i<NUM_THREADS; i++)
+    {
+        list<UF*>* ufList = new list<UF*>();
+        ufList->push_back(new SetupClientUF());
+        UFIO::ufCreateThreadWithIO(&thread[i], ufList);
+    }
+
+
+    //wait for kids
+    void* status;
+    for(i=0; i<NUM_THREADS; i++)
+        pthread_join(thread[i], &status);
+    delete [] thread;
+
+    printResults();
+}

Modified: trafficserver/traffic/branches/UserFiber/samples/echoServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/echoServer.C?rev=943474&r1=943473&r2=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/echoServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/samples/echoServer.C Wed May 12 13:00:49 2010
@@ -26,6 +26,7 @@ void EchoServer::handleNewConnection(UFI
     }
 
     char buf[256];
+    int n = 0;
     while ( ((n = ufio->read(buf, 255, readTimeout)) > 0) && ((n = ufio->write(buf, 255)) > 0) ) {}
 }
 

Copied: trafficserver/traffic/branches/UserFiber/src/UF.C (from r943471, trafficserver/traffic/branches/UserFiber/UF.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UF.C?p2=trafficserver/traffic/branches/UserFiber/src/UF.C&p1=trafficserver/traffic/branches/UserFiber/UF.C&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UF.C Wed May 12 13:00:49 2010
@@ -107,9 +107,10 @@ static pthread_key_t getThreadKey()
 pthread_key_t UFScheduler::_specific_key = getThreadKey();
 UFScheduler::UFScheduler()
 {
+    _exitJustMe = false;
     _specific = 0;
     _currentFiber = 0;
-    //_exit = false;
+
     if(_inThreadedMode)
     {
         pthread_mutex_init(&_mutexToNominateToActiveList, NULL);
@@ -255,7 +256,7 @@ void UFScheduler::runScheduler()
     struct timeval now;
     struct timeval start,finish;
     gettimeofday(&start, 0);
-    while(!_exit)
+    while(!_exitJustMe && !_exit)
     {
         UFList::iterator beg = _activeRunningList.begin();
         for(; beg != _activeRunningList.end(); )
@@ -423,3 +424,35 @@ int UFFactory::registerFunc(UF* uf)
     return _size++;
 }
 
+void* setupThread(void* args)
+{
+    if(!args)
+        return 0;
+
+    list<UF*>* ufsToStartWith = (list<UF*>*) args;
+    UFScheduler ufs;
+    for(list<UF*>::iterator beg = ufsToStartWith->begin();
+        beg != ufsToStartWith->end();
+        ++beg)
+        ufs.addFiberToScheduler(*beg);
+    delete ufsToStartWith;
+
+    //run the scheduler
+    ufs.runScheduler();
+
+    return 0;
+}
+
+void UFScheduler::ufCreateThread(pthread_t* tid, list<UF*>* ufsToStartWith)
+{
+    //create the threads
+    pthread_attr_t attr;
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+    if(pthread_create(tid, &attr, setupThread, (void*)ufsToStartWith) != 0)
+    {
+        cerr<<"couldnt create thread "<<strerror(errno)<<endl;
+        exit(1);
+    }
+}

Propchange: trafficserver/traffic/branches/UserFiber/src/UF.C
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: trafficserver/traffic/branches/UserFiber/src/UFIO.C (from r943471, trafficserver/traffic/branches/UserFiber/UFIO.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFIO.C?p2=trafficserver/traffic/branches/UserFiber/src/UFIO.C&p1=trafficserver/traffic/branches/UserFiber/UFIO.C&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFIO.C Wed May 12 13:00:49 2010
@@ -422,36 +422,21 @@ int UFIO::connect(const struct sockaddr 
     //find the scheduler for this request
     UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
 
-    int n = 0;
-    int err = 0;
     while(::connect(_fd, addr, addrlen) < 0)
     {
         _errno = errno;
-        if(errno != EINTR)
+        if(errno == EINTR)
+            continue;
+        else if(errno == EINPROGRESS || errno == EAGAIN)
         {
-            if((errno != EINPROGRESS || errno != EAGAIN) && 
-               (errno != EADDRINUSE || err == 0))
-                return -1;
-
-            //wait to finish the connect
             if(!tmpUfios->setupForConnect(this, timeout))
             {
                 cerr<<"couldnt setup for connect - "<<strerror(errno)<<endl;
                 return -1;
             }
-
-            n = sizeof(int);
-            if (getsockopt(_fd, SOL_SOCKET, SO_ERROR, (char *)&err, (socklen_t *)&n) < 0)
-                return -1;
-            if(err)
-            {
-                _errno = err;
-                return -1;
-            }
-
-            //successful
-            break;
         }
+        else
+            return -1;
     }
 
     return 0;
@@ -892,7 +877,7 @@ void EpollUFIOScheduler::waitForEvents(T
 #endif
     myScheduler->_notifyFunc = notifyEpollFunc;
     //add the UF to handle the efds calls
-    UF* eventFdFiber = UFFactory::getInstance()->selectUF(ReadNotificationUF::_myLoc)->createUF();
+    UF* eventFdFiber = new ReadNotificationUF();
     eventFdFiber->_startingArgs = ens;
     myScheduler->addFiberToScheduler(eventFdFiber, 0);
 
@@ -1013,4 +998,23 @@ void EpollUFIOScheduler::waitForEvents(T
     myScheduler->_notifyFunc = 0;
 }
 
+int IORunner::_myLoc = -1;
+IORunner* IORunner::_self = new IORunner(true);
+void IORunner::run()
+{
+    UF* uf = UFScheduler::getUF();
+    //add the scheduler for this 
+    EpollUFIOScheduler* ioRunner = new EpollUFIOScheduler(uf, 10000); //TODO: support other event scheduler mechanisms later
+    if(!ioRunner || !ioRunner->isSetup())
+    {
+        cerr<<"couldnt setup epoll io scheduler object"<<endl;
+        return;
+    }
+    ioRunner->waitForEvents(1000000); //TODO: allow to change the epoll interval later
+}
 
+void UFIO::ufCreateThreadWithIO(pthread_t* tid, list<UF*>* ufsToStartWith)
+{
+    ufsToStartWith->push_front(new IORunner());
+    UFScheduler::ufCreateThread(tid, ufsToStartWith);
+}

Propchange: trafficserver/traffic/branches/UserFiber/src/UFIO.C
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: trafficserver/traffic/branches/UserFiber/src/UFServer.C (from r943471, trafficserver/traffic/branches/UserFiber/UFServer.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFServer.C?p2=trafficserver/traffic/branches/UserFiber/src/UFServer.C&p1=trafficserver/traffic/branches/UserFiber/UFServer.C&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFServer.C Wed May 12 13:00:49 2010
@@ -108,9 +108,9 @@ static void* acceptThreadStart(void* arg
 
     UFScheduler ufs;
     //add the io scheduler
-    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+    ufs.addFiberToScheduler(new IORunner());
     //add the accept port fiber
-    UF* uf = UFFactory::getInstance()->selectUF(AcceptRunner::_myLoc)->createUF();
+    UF* uf = new AcceptRunner();
     if(!uf)
         return 0;
     uf->_startingArgs = args;
@@ -128,7 +128,7 @@ static void* ioThreadStart(void* args)
 
     UFScheduler ufs;
     //add the io scheduler
-    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+    ufs.addFiberToScheduler(new IORunner());
 
     ((UFServer*) args)->addThread("NETIO", &ufs);
     ufs.runScheduler();
@@ -287,17 +287,3 @@ vector<pthread_t>* UFServer::getThreadTy
 }
 
 
-int IORunner::_myLoc = -1;
-IORunner* IORunner::_self = new IORunner(true);
-void IORunner::run()
-{
-    UF* uf = UFScheduler::getUF();
-    //add the scheduler for this 
-    EpollUFIOScheduler* ioRunner = new EpollUFIOScheduler(uf, 10000); //TODO: support other event scheduler mechanisms later
-    if(!ioRunner || !ioRunner->isSetup())
-    {
-        cerr<<"couldnt setup epoll io scheduler object"<<endl;
-        return;
-    }
-    ioRunner->waitForEvents(1000000); //TODO: allow to change the epoll interval later
-}

Propchange: trafficserver/traffic/branches/UserFiber/src/UFServer.C
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C (from r943471, trafficserver/traffic/branches/UserFiber/UFStatSystem.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C?p2=trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C&p1=trafficserver/traffic/branches/UserFiber/UFStatSystem.C&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFStatSystem.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C Wed May 12 13:00:49 2010
@@ -1,9 +1,9 @@
+#include <string.h>
+#include <stdio.h>
 #include "UFStatSystem.H"
 #include "UF.H"
 #include "UFIO.H"
 #include "UFServer.H"
-#include <string.h>
-#include <stdio.h>
 #include <iostream>
 #include <errno.h>
 #include <sys/types.h>
@@ -466,16 +466,16 @@ void* UFStatCollector::scheduler(void *a
     UFScheduler ufs;
 
     // stat collector
-    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(CollectorRunner::_myLoc)->createUF());
+    ufs.addFiberToScheduler(new CollectorRunner());
 
     // set thread for stat command listener to run on
     StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
     
     // io scheduler
-    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+    ufs.addFiberToScheduler(new IORunner());
     
     // stat command listener
-    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(StatCommandListenerRun::_myLoc)->createUF());
+    ufs.addFiberToScheduler(new StatCommandListenerRun());
     ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
     ufs.runScheduler();
     return 0;

Propchange: trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C
------------------------------------------------------------------------------
    svn:mergeinfo =