You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/12 15:00:50 UTC
svn commit: r943474 - in /trafficserver/traffic/branches/UserFiber: ./
include/ lib/ samples/ src/
Author: akundu
Date: Wed May 12 13:00:49 2010
New Revision: 943474
URL: http://svn.apache.org/viewvc?rev=943474&view=rev
Log:
1. added the ConnectionPool code
2. rearranged code location (into include/src/lib format)
3. updated connect code
Added:
trafficserver/traffic/branches/UserFiber/include/
trafficserver/traffic/branches/UserFiber/include/UF.H (contents, props changed)
- copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UF.H
trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
trafficserver/traffic/branches/UserFiber/include/UFIO.H (contents, props changed)
- copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFIO.H
trafficserver/traffic/branches/UserFiber/include/UFServer.H (contents, props changed)
- copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFServer.H
trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H (props changed)
- copied unchanged from r943471, trafficserver/traffic/branches/UserFiber/UFStatSystem.H
trafficserver/traffic/branches/UserFiber/lib/
trafficserver/traffic/branches/UserFiber/samples/Makefile
trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
trafficserver/traffic/branches/UserFiber/src/
trafficserver/traffic/branches/UserFiber/src/UF.C (contents, props changed)
- copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UF.C
trafficserver/traffic/branches/UserFiber/src/UFIO.C (contents, props changed)
- copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFIO.C
trafficserver/traffic/branches/UserFiber/src/UFServer.C (contents, props changed)
- copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFServer.C
trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C (contents, props changed)
- copied, changed from r943471, trafficserver/traffic/branches/UserFiber/UFStatSystem.C
Removed:
trafficserver/traffic/branches/UserFiber/UF.C
trafficserver/traffic/branches/UserFiber/UF.H
trafficserver/traffic/branches/UserFiber/UFIO.C
trafficserver/traffic/branches/UserFiber/UFIO.H
trafficserver/traffic/branches/UserFiber/UFServer.C
trafficserver/traffic/branches/UserFiber/UFServer.H
trafficserver/traffic/branches/UserFiber/UFStatSystem.C
trafficserver/traffic/branches/UserFiber/UFStatSystem.H
Modified:
trafficserver/traffic/branches/UserFiber/Makefile
trafficserver/traffic/branches/UserFiber/samples/echoServer.C
Modified: trafficserver/traffic/branches/UserFiber/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/Makefile?rev=943474&r1=943473&r2=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/Makefile (original)
+++ trafficserver/traffic/branches/UserFiber/Makefile Wed May 12 13:00:49 2010
@@ -4,23 +4,11 @@ ARCH=x86-64
all: ufHTTPServer
-UF.o: UF.C UF.H
- $(CPP) $(BUILD_FLAGS) -c -o UF.o UF.C -march=$(ARCH)
-
-UFIO.o: UFIO.C UFIO.H
- $(CPP) $(BUILD_FLAGS) -c -o UFIO.o UFIO.C -march=$(ARCH)
-
-UFStatSystem.o: UFStatSystem.C UFStatSystem.H
- $(CPP) $(BUILD_FLAGS) -c -o UFStatSystem.o UFStatSystem.C -march=$(ARCH)
-
-UFServer.o: UFServer.C UFServer.H
- $(CPP) $(BUILD_FLAGS) -c -o UFServer.o UFServer.C -march=$(ARCH)
-
-ufHTTPServer.o: ufHTTPServer.C UF.o UFIO.o UFStatSystem.o UFServer.o
- $(CPP) $(BUILD_FLAGS) -c -o ufHTTPServer.o ufHTTPServer.C -march=$(ARCH)
+ufHTTPServer.o: ufHTTPServer.C
+ $(CPP) $(BUILD_FLAGS) -c -I./include -o ufHTTPServer.o ufHTTPServer.C -march=$(ARCH)
ufHTTPServer: ufHTTPServer.o
- $(CPP) $(BUILD_FLAGS) -o ufHTTPServer UF.o UFIO.o UFStatSystem.o UFServer.o ufHTTPServer.o -lpthread -march=$(ARCH)
+ $(CPP) $(BUILD_FLAGS) -o ufHTTPServer ufHTTPServer.o -L./lib -lUF -lpthread -march=$(ARCH)
clean:
rm *.o ufHTTPServer
Copied: trafficserver/traffic/branches/UserFiber/include/UF.H (from r943471, trafficserver/traffic/branches/UserFiber/UF.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UF.H?p2=trafficserver/traffic/branches/UserFiber/include/UF.H&p1=trafficserver/traffic/branches/UserFiber/UF.H&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UF.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UF.H Wed May 12 13:00:49 2010
@@ -50,6 +50,7 @@ struct UF
void yield();
///must be called after the fiber is added to a scheduler
void usleep(unsigned long long int sleepAmtInUs);
+ static void gusleep(unsigned long long int sleepAmtInUs);
///simply block the fiber
void block();
@@ -168,18 +169,21 @@ struct UFScheduler
const ucontext_t& getMainContext() const;
void setSpecific(void* args);
void* getSpecific() const;
- void setExit(bool exit = true);
+ static void setExit(bool exit = true);
+ void setExitJustMe(bool exit = true);
///the variable that says whether the scheduler should be handling the sleep or
//if its handled w/in the UserFabrics
void* (*_notifyFunc)(void*);
void* _notifyArgs;
-
//to allow to identify the thread running now
static pthread_key_t _specific_key;
+ static void ufCreateThread(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+
static bool _exit;
+ bool _exitJustMe;
protected:
UF* _currentFiber;
ucontext_t _mainContext;
@@ -214,11 +218,17 @@ inline const ucontext_t& UFScheduler::ge
inline void UFScheduler::setSpecific(void* args) { _specific = args; }
inline void* UFScheduler::getSpecific() const { return _specific; }
inline void UFScheduler::setExit(bool exit) { _exit = exit; }
+inline void UFScheduler::setExitJustMe(bool exit) { _exitJustMe = exit; }
inline UFScheduler* UF::getParentScheduler() const { return _parentScheduler; }
inline void UF::waitOnLock() { block(); }
+inline void UF::gusleep(unsigned long long int sleepAmtInUs)
+{
+ UFScheduler::getUFScheduler()->getRunningFiberOnThisThread()->usleep(sleepAmtInUs);
+}
+
inline void UF::usleep(unsigned long long int sleepAmtInUs)
{
if(!sleepAmtInUs)
Propchange: trafficserver/traffic/branches/UserFiber/include/UF.H
------------------------------------------------------------------------------
svn:mergeinfo =
Added: trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H?rev=943474&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H (added)
+++ trafficserver/traffic/branches/UserFiber/include/UFConnectionPool.H Wed May 12 13:00:49 2010
@@ -0,0 +1,91 @@
+#ifndef UFCONNECTIONPOOL_H
+#define UFCONNECTIONPOOL_H
+
+#include <time.h>
+
+#include <string>
+#include <vector>
+#include <map>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+
+class StIpInfo;
+class UFConnectionGroupInfo;
+class UFIO;
+class UFConnectionPoolImpl;
+class UFConnectionPool
+{
+public:
+ UFConnectionPool();
+
+ // Call init before thread creation
+ static void init();
+
+ unsigned int loadConfigFile(const std::string& fileName);
+ unsigned int loadConfigFile(const std::string& fileName, int maxSimultaneousConns);
+
+ bool addGroup(UFConnectionGroupInfo* stGroupInfo);
+ UFConnectionGroupInfo* removeGroup(const std::string& name);
+ UFIO* getConnection(const std::string& groupName);
+ UFIO* getConnection(const std::string& groupName, bool waitForConnection);
+ void releaseConnection(UFIO* ufIO, bool connOk = true);
+ void setTimeoutIP(int timeoutIn); ///how long to timeout an ip that we cant connect to (is not responding)
+ std::string fillInfo(std::string& data, bool detailed = false) const;
+ double getGroupAvailability(const std::string& name) const;
+
+ //TODO - get stats & detailed stats info
+protected:
+
+ UFConnectionPoolImpl* impl;
+
+};
+
+namespace StringUtil
+{
+ typedef std::vector<std::string> StringVector;
+ typedef std::vector<std::string>::iterator StringVectorItr;
+ unsigned int split(const std::string& input, const std::string& splitOn, StringVector& output);
+ std::string trim_ws(const std::string& input);
+}
+
+struct UFIO;
+typedef std::map<UFIO*, int> UFIOIntMap;
+struct UFConnectionIpInfo
+{
+ UFConnectionIpInfo(const std::string& ip, bool persistent = true, int maxSimultaneousConns = -1, int _timeOutPerTransaction = 90);
+ ~UFConnectionIpInfo() {} //TODO: remove all the conns w/in currently available and currently used
+ std::string _ip;
+ struct sockaddr_in _sin;
+ int _maxSimultaneousConns;
+ bool _persistent;
+
+ int _timeOutPerTransaction; ///how many ms to try before considering the connect a failure
+ int _timedOut;
+
+ UFIOIntMap _currentlyAvailableConnections;
+ UFIOIntMap _currentlyUsedConnections;
+ unsigned int _inProcessCount;
+};
+
+
+typedef std::vector<UFConnectionIpInfo*> UFConnectionIpInfoList;
+struct UFConnectionGroupInfo
+{
+ UFConnectionGroupInfo(const std::string& name);
+ ~UFConnectionGroupInfo();
+
+ bool addIP(UFConnectionIpInfo* UFConnectionIpInfo);
+ UFConnectionIpInfo* removeIP(const std::string& ip);
+ double getAvailability() const ;
+
+ std::string _name;
+
+ UFConnectionIpInfoList _ipInfoList;
+};
+
+
+#endif
Copied: trafficserver/traffic/branches/UserFiber/include/UFIO.H (from r943471, trafficserver/traffic/branches/UserFiber/UFIO.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFIO.H?p2=trafficserver/traffic/branches/UserFiber/include/UFIO.H&p1=trafficserver/traffic/branches/UserFiber/UFIO.H&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFIO.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFIO.H Wed May 12 13:00:49 2010
@@ -112,6 +112,9 @@ struct UFIO
void setUFIOScheduler(UFIOScheduler* ufios);
UFSleepInfo* _sleepInfo;
+
+ static void ufCreateThreadWithIO(pthread_t* tid, std::list<UF*>* ufsToStartWith);
+
protected:
int _fd;
unsigned int _errno;
@@ -221,4 +224,17 @@ inline void EpollUFIOScheduler::releaseS
_availableSleepInfo.push_back(&ufsi);
}
+struct IORunner : public UF
+{
+ void run();
+ IORunner(bool registerMe = false)
+ {
+ if(registerMe)
+ _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+ }
+ UF* createUF() { return new IORunner(); }
+ static IORunner* _self;
+ static int _myLoc;
+};
+
#endif
Propchange: trafficserver/traffic/branches/UserFiber/include/UFIO.H
------------------------------------------------------------------------------
svn:mergeinfo =
Copied: trafficserver/traffic/branches/UserFiber/include/UFServer.H (from r943471, trafficserver/traffic/branches/UserFiber/UFServer.H)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/include/UFServer.H?p2=trafficserver/traffic/branches/UserFiber/include/UFServer.H&p1=trafficserver/traffic/branches/UserFiber/UFServer.H&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFServer.H (original)
+++ trafficserver/traffic/branches/UserFiber/include/UFServer.H Wed May 12 13:00:49 2010
@@ -102,17 +102,4 @@ inline std::pair<UFScheduler*, pthread_t
return _threadList[lastLocUsed++%(_threadList.size())];
}
-struct IORunner : public UF
-{
- void run();
- IORunner(bool registerMe = false)
- {
- if(registerMe)
- _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
- }
- UF* createUF() { return new IORunner(); }
- static IORunner* _self;
- static int _myLoc;
-};
-
#endif
Propchange: trafficserver/traffic/branches/UserFiber/include/UFServer.H
------------------------------------------------------------------------------
svn:mergeinfo =
Propchange: trafficserver/traffic/branches/UserFiber/include/UFStatSystem.H
------------------------------------------------------------------------------
svn:mergeinfo =
Added: trafficserver/traffic/branches/UserFiber/samples/Makefile
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/Makefile?rev=943474&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/Makefile (added)
+++ trafficserver/traffic/branches/UserFiber/samples/Makefile Wed May 12 13:00:49 2010
@@ -0,0 +1,22 @@
+CPP=c++
+BUILD_FLAGS=-g -O3 -Wall -DPIPE_NOT_EFD -Wno-deprecated
+ARCH=x86-64
+INCLUDE=-I../include
+INCLUDE_DIR=../include
+
+all: UFHTTPLoader echoServer
+
+UFHTTPLoader.o: UFHTTPLoader.C
+ $(CPP) $(BUILD_FLAGS) -c -o UFHTTPLoader.o UFHTTPLoader.C $(INCLUDE) -march=$(ARCH)
+
+UFHTTPLoader: UFHTTPLoader.o
+ $(CPP) $(BUILD_FLAGS) -o UFHTTPLoader UFHTTPLoader.o -L../lib/ -lUF -lpthread -march=$(ARCH)
+
+echoServer.o: echoServer.C
+ $(CPP) $(BUILD_FLAGS) -c -o echoServer.o echoServer.C $(INCLUDE) -march=$(ARCH)
+
+echoServer: echoServer.o
+ $(CPP) $(BUILD_FLAGS) -o echoServer echoServer.o -L../lib/ -lUF -lpthread -march=$(ARCH)
+
+clean:
+ rm *.o UFHTTPLoader
Added: trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C?rev=943474&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C (added)
+++ trafficserver/traffic/branches/UserFiber/samples/UFHTTPLoader.C Wed May 12 13:00:49 2010
@@ -0,0 +1,711 @@
+#include <algorithm>
+#include <errno.h>
+#include <vector>
+#include <time.h>
+#include <sstream>
+#include <iostream>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <sys/stat.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <fcntl.h>
+#include <string>
+
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
+
+using namespace std;
+
+unsigned short int NUM_THREADS = 1;
+unsigned int NUM_USER_FIBERS_ALLOWED_TO_RUN = 1;
+unsigned int NUM_CONNECTIONS_PER_FIBER = 1;
+unsigned int NUM_REQUESTS_PER_FIBER = 1;
+unsigned int RATE_TO_CREATE_FIBERS = 0;
+int RATE_TO_CREATE_FIBERS_TIME = -1;
+unsigned short int THREAD_COMPLETION_PERCENT_TO_BAIL_ON = 100;
+
+struct ResponseInfoObject
+{
+ ResponseInfoObject()
+ {
+ total_time = 0;
+ connect_time = 0;
+ num_attempt = 0;
+ num_success = 0;
+ total_success_time = 0;
+
+ thread_create_error = 0;
+ socket_creation_error = 0;
+ error_response = 0;
+ invalid_response = 0;
+ connect_error = 0;
+ connect_success = 0;
+ write_error = 0;
+ read_error = 0;
+ num_user_fibers_running = 0;
+ }
+
+ unsigned long long int total_time;
+ unsigned long long int connect_time;
+ unsigned int num_attempt;
+ unsigned int num_success;
+ unsigned long long int total_success_time;
+
+ unsigned int thread_create_error;
+ unsigned int socket_creation_error;
+ unsigned int error_response;
+ unsigned int invalid_response;
+ unsigned int connect_success;
+ unsigned int connect_error;
+ unsigned int write_error;
+ unsigned int read_error;
+ unsigned int num_user_fibers_running;
+ vector <unsigned long long int> results;
+};
+ResponseInfoObject overallInfo;
+pthread_mutex_t overallInfoTrackMutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_key_t threadUpdateOverallInfo;
+struct sockaddr_in rmt_addr;
+
+int GET_RESPONSE_TIMEOUT = 1*1000*1000;
+string DOUBLE_NEWLINE = "\r\n\r\n";
+unsigned int DOUBLE_NEWLINE_LENGTH = DOUBLE_NEWLINE.length();
+bool readData(UFIO* ufio, unsigned int timeout, bool& connClosed)
+{
+ string result;
+ char buf[4096];
+ unsigned int searchStartPos;
+ size_t endOfHeaders = string::npos;
+ unsigned int contentLength = 0;
+ bool okToExitToEnd = false;
+ while(1)
+ {
+ int num_bytes_read = ufio->read(buf, 4095, 0);
+ if(num_bytes_read <= 0)
+ {
+ if(okToExitToEnd && (num_bytes_read == 0))
+ {
+ connClosed = true;
+ return true;
+ }
+ return false;
+ }
+ result.append(buf, num_bytes_read);
+
+
+
+ if(endOfHeaders == string::npos)
+ {
+ if(result.length() > (num_bytes_read + DOUBLE_NEWLINE.length()))
+ searchStartPos = result.length() - (num_bytes_read + DOUBLE_NEWLINE.length());
+ else
+ searchStartPos = 0;
+ endOfHeaders = result.find(DOUBLE_NEWLINE, searchStartPos);
+ if(endOfHeaders != string::npos)
+ {
+ //search for the content length;
+ bool foundContentLength = false;
+ const char* indexOfCL = strstr(result.c_str(), "Content-Length: ");
+ if(indexOfCL)
+ {
+ sscanf(indexOfCL, "Content-Length: %d", &contentLength);
+ if(!contentLength)
+ cerr<<"found content length but not bytes = "<<result.c_str()+(indexOfCL-result.data())<<endl;
+ else
+ foundContentLength = true;
+ }
+ else
+ {
+ indexOfCL = strstr(result.c_str(), "Content-Length:");
+ if(indexOfCL)
+ {
+ sscanf(indexOfCL, "Content-Length:%d", &contentLength);
+ if(!contentLength)
+ cerr<<"found content length but not bytes = "<<result.c_str()+(indexOfCL-result.data())<<endl;
+ else
+ foundContentLength = true;
+ }
+ }
+ if(!foundContentLength)
+ okToExitToEnd = true;
+ }
+ }
+ if(!okToExitToEnd &&
+ (result.length() > (endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength))) //dont support pipelining yet
+ {
+ cerr<<"read more than supposed to"<<endl;
+ return false;
+ }
+ else if(result.length() == endOfHeaders + DOUBLE_NEWLINE_LENGTH + contentLength)
+ return true;
+ }
+
+ return false;
+}
+
+
+unsigned int SLEEP_BETWEEN_CONN_SETUP = 0;
+int CONNECT_AND_REQUEST_TIMEOUT = 1*1000*1000;
+bool writeData(UFIO* ufio, const string& data)
+{
+ int amt_written = ufio->write(data.data(), data.length(), CONNECT_AND_REQUEST_TIMEOUT);
+ return ((amt_written == (int)data.length()) ? true : false);
+}
+
+int sockBuf = 0;
+UFIO* getConn(ResponseInfoObject* rIO)
+{
+ //create the socket to build the connection on
+ UFIO* ufio = new UFIO(UFScheduler::getUFScheduler()->getRunningFiberOnThisThread());
+ if(!ufio)
+ return 0;
+
+ struct timeval start,finish;
+ gettimeofday(&start, 0);
+ if(ufio->connect((struct sockaddr*)&rmt_addr, sizeof(rmt_addr), CONNECT_AND_REQUEST_TIMEOUT) != 0)
+ {
+ if(random()%100 < 10)
+ cerr<<"connect error: "<<strerror(errno)<<endl;
+ rIO->connect_error++;
+ return 0;
+ }
+ gettimeofday(&finish, 0);
+ rIO->connect_success++;
+ rIO->connect_time += ((finish.tv_sec - start.tv_sec) * 1000000) + (finish.tv_usec - start.tv_usec);
+
+
+ if(sockBuf)
+ {
+ setsockopt(ufio->getFd(), SOL_SOCKET, SO_RCVBUF, (char*) &sockBuf, sizeof( sockBuf ));
+ setsockopt(ufio->getFd(), SOL_SOCKET, SO_SNDBUF, (char*) &sockBuf, sizeof( sockBuf ));
+ }
+
+ return ufio;
+}
+
+string host_header = "";
+bool GENERATE_RANDOM_STRING = false;
+string HTTP_BASE_REQ_STRING = "/index.html";
+string MSG_STRING = "";
+unsigned int INTER_SEND_SLEEP = 0;
+unsigned int global_counter = 0;
+
+
+
+unsigned long long int GENERATE_RANDOM_STRING_CONSTRAINT = 2000000000;
+bool sleepShouldBeRandom = true;
+void run_handler()
+{
+ ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo);
+ if(!rIO)
+ return;
+
+ //create the socket to build the connection on
+ struct timeval start,finish;
+ UFIO* ufio = getConn(rIO);
+ if(!ufio)
+ return;
+
+ //do the requests
+ unsigned short int num_requests_run = 0;
+ while(num_requests_run++ < NUM_REQUESTS_PER_FIBER)
+ {
+ if(INTER_SEND_SLEEP)
+ {
+ if(sleepShouldBeRandom)
+ UF::gusleep((random()%INTER_SEND_SLEEP)*1000);
+ else
+ UF::gusleep(INTER_SEND_SLEEP*1000);
+ }
+
+ rIO->num_attempt++;
+
+ if(GENERATE_RANDOM_STRING)
+ {
+ stringstream ss;
+ ss<<"GET "<<"/test"<<random()%10<<"/test"<<random()%10<<"/index.html/"<<(random()%GENERATE_RANDOM_STRING_CONSTRAINT)<<" HTTP/1.0\r\nHost: "<<host_header<<"\r\nConnection: Keep-Alive\r\n\r\n";
+ gettimeofday(&start, 0);
+ if(!writeData(ufio, ss.str()))
+ goto run_handler_done;
+ }
+ else
+ {
+ if(!MSG_STRING.length())
+ MSG_STRING = "GET " + HTTP_BASE_REQ_STRING + " HTTP/1.0\r\nHost: " + host_header + "\r\nConnection: Keep-Alive\r\n\r\n";
+ gettimeofday(&start, 0);
+ if(!writeData(ufio, MSG_STRING))
+ {
+ if(random()%100 < 10)
+ cerr<<"error on write = "<<strerror(errno)<<endl;
+ rIO->write_error++;
+ goto run_handler_done;
+ }
+ }
+
+
+ bool connClosed = false;
+ if(!readData(ufio, CONNECT_AND_REQUEST_TIMEOUT, connClosed))
+ {
+ if(random()%100 < 10)
+ cerr<<"bailing since read data failed "<<strerror(errno)<<endl;
+ rIO->read_error++;
+ goto run_handler_done;
+ }
+
+ gettimeofday(&finish, 0);
+ //save the diff_time
+ unsigned long long int diff_time = ((finish.tv_sec - start.tv_sec) * 1000000) + (finish.tv_usec - start.tv_usec);
+ rIO->results.push_back(diff_time);
+ rIO->total_success_time += diff_time;
+
+
+ rIO->num_success++;
+
+ if(connClosed)
+ {
+ delete ufio;
+ ufio = getConn(rIO);
+ if(!ufio)
+ return;
+ }
+ }
+
+run_handler_done:
+ delete ufio;
+}
+
+
+
+void read_address(const char *str, struct sockaddr_in *sin)
+{
+ char host[128], *p;
+ struct hostent *hp;
+ short port;
+
+ strcpy(host, str);
+ if ((p = strchr(host, ':')) == NULL)
+ {
+ cerr<<"invalid host: "<<host<<endl;
+ exit(1);
+ }
+ *p++ = '\0';
+ port = (short) atoi(p);
+ if (port < 1)
+ {
+
+ cerr<<"invalid port: "<<port<<endl;
+ exit(1);
+ }
+
+ memset(sin, 0, sizeof(struct sockaddr_in));
+ sin->sin_family = AF_INET;
+ sin->sin_port = htons(port);
+ if (host[0] == '\0')
+ {
+ sin->sin_addr.s_addr = INADDR_ANY;
+ return;
+ }
+ sin->sin_addr.s_addr = inet_addr(host);
+ if (sin->sin_addr.s_addr == INADDR_NONE)
+ {
+ /* not dotted-decimal */
+ if ((hp = gethostbyname(host)) == NULL)
+ {
+ cerr<<"cant resolve address "<<host<<endl;
+ exit(1);
+ }
+ memcpy(&sin->sin_addr, hp->h_addr, hp->h_length);
+ }
+}
+
+struct ClientUF : public UF
+{
+ void run();
+ UF* createUF() { return new ClientUF(); }
+};
+
+void ClientUF::run()
+{
+ ResponseInfoObject* rIO = (ResponseInfoObject*)pthread_getspecific(threadUpdateOverallInfo);
+ if(!rIO)
+ return;
+ unsigned short int num_requests_run = 0;
+ while(num_requests_run++ < NUM_CONNECTIONS_PER_FIBER)
+ {
+ //wait if told to do so
+ if(SLEEP_BETWEEN_CONN_SETUP)
+ {
+ if(sleepShouldBeRandom)
+ UF::gusleep((random()%SLEEP_BETWEEN_CONN_SETUP)*1000);
+ else
+ UF::gusleep(SLEEP_BETWEEN_CONN_SETUP*1000);
+ }
+
+ //now start sending the data
+ run_handler();
+ }
+
+ rIO->num_user_fibers_running--;
+ return;
+}
+
+struct SetupClientUF : public UF
+{
+ void run();
+ UF* createUF() { return new SetupClientUF(); }
+};
+
+void SetupClientUF::run()
+{
+ ResponseInfoObject rIO;
+ pthread_setspecific(threadUpdateOverallInfo, &rIO);
+
+ UFScheduler* ufs = UFScheduler::getUFScheduler();
+ if(!ufs)
+ return;
+
+ unsigned short int num_contiguous_thread_creation_failures = 0;
+ struct timeval start,finish;
+ gettimeofday(&start, 0);
+
+
+ while(rIO.num_user_fibers_running < NUM_USER_FIBERS_ALLOWED_TO_RUN)
+ {
+ if (!ufs->addFiberToScheduler(new ClientUF()))
+ {
+ rIO.thread_create_error++;
+ cerr<<"thread_create error with errno = "<<strerror(errno)<<endl;
+ //only 5 contiguous failues allowed at a time
+ if (num_contiguous_thread_creation_failures++ == 5)
+ {
+ cerr<<"had "<<num_contiguous_thread_creation_failures<<" failures in creating threads - so bailing"<<endl;
+ break;
+ }
+ }
+ else
+ {
+ rIO.num_user_fibers_running++;
+ num_contiguous_thread_creation_failures = 0;
+ }
+ }
+
+ //create the RATE_TO_CREATE_FIBERS based threads
+ if(RATE_TO_CREATE_FIBERS)
+ {
+ time_t start_time = time(0);
+ num_contiguous_thread_creation_failures = 0;
+ bool breakFromRateCreationThreads = false;
+ unsigned int totalCountRate = 0;
+ while(!breakFromRateCreationThreads)
+ {
+ struct timeval start_rate,finish_rate;
+ gettimeofday(&start_rate, 0);
+ cerr<<time(0)<<": "<<getpid()<<" opening "<<RATE_TO_CREATE_FIBERS<<" connections with current count = "<<totalCountRate<<" and num active threads = "<<rIO.num_user_fibers_running<<endl;
+ for(unsigned int i = 0; i < RATE_TO_CREATE_FIBERS; ++i)
+ {
+ if (!ufs->addFiberToScheduler(new ClientUF()))
+ {
+ rIO.thread_create_error++;
+ cerr<<"thread_create error with errno = "<<strerror(errno)<<endl;
+ //only 5 contiguous failues allowed at a time
+ if (num_contiguous_thread_creation_failures++ == 5)
+ {
+ cerr<<"had "<<num_contiguous_thread_creation_failures<<" failures in creating threads - so bailing"<<endl;
+ breakFromRateCreationThreads = true;
+ break;
+ }
+ }
+ else
+ rIO.num_user_fibers_running++;
+
+ UF::gusleep((int)(1000000/RATE_TO_CREATE_FIBERS));
+
+ totalCountRate++;
+ }
+ gettimeofday(&finish_rate, 0);
+
+ if((RATE_TO_CREATE_FIBERS_TIME != -1) && (RATE_TO_CREATE_FIBERS_TIME + start_time) < time(0))
+ break;
+ }
+ }
+ else
+ {
+ while(1)
+ {
+ if(!rIO.num_user_fibers_running)
+ break;
+ else if ((THREAD_COMPLETION_PERCENT_TO_BAIL_ON < 100) &&
+ (rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN <= THREAD_COMPLETION_PERCENT_TO_BAIL_ON)
+ )
+ {
+ cerr<<"bailing due to "<<"num_user_fibers_running = "<<rIO.num_user_fibers_running<<" and div = "<<(rIO.num_user_fibers_running*100/NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" and amt to bail on = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl;
+ break;
+ }
+
+ UF::gusleep(5000000);
+ cerr <<pthread_self()<<": completed "<<rIO.num_attempt<<"/"<<(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN)<<" ("<<(rIO.num_attempt*100/(NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN))<<"%)"<<endl;
+ }
+ }
+
+
+ gettimeofday(&finish, 0);
+ rIO.total_time += (finish.tv_sec-start.tv_sec)*1000000 + (finish.tv_usec-start.tv_usec);
+
+
+ //TODO:add the total info into the total pool
+ pthread_mutex_lock(&overallInfoTrackMutex);
+ if(rIO.total_time > overallInfo.total_time)
+ overallInfo.total_time = rIO.total_time;
+ overallInfo.connect_time += rIO.connect_time;
+ overallInfo.num_attempt += rIO.num_attempt;
+ overallInfo.num_success += rIO.num_success;
+ overallInfo.total_success_time += rIO.total_success_time;
+ overallInfo.thread_create_error += rIO.thread_create_error;
+ overallInfo.socket_creation_error += rIO.socket_creation_error;
+ overallInfo.error_response += rIO.error_response;
+ overallInfo.invalid_response += rIO.invalid_response;
+ overallInfo.connect_error += rIO.connect_error;
+ overallInfo.connect_success += rIO.connect_success;
+ overallInfo.write_error += rIO.write_error;
+ overallInfo.read_error += rIO.read_error;
+ overallInfo.num_user_fibers_running += rIO.num_user_fibers_running;
+ for(unsigned int i =0; i < rIO.results.size(); ++i)
+ overallInfo.results.push_back(rIO.results[i]);
+ pthread_mutex_unlock(&overallInfoTrackMutex);
+
+
+ ufs->setExitJustMe(true);
+}
+
+void printResults()
+{
+ unsigned int num_fail = overallInfo.read_error+overallInfo.write_error+overallInfo.connect_error+overallInfo.error_response+overallInfo.invalid_response;
+ unsigned int total_should_run = NUM_REQUESTS_PER_FIBER*NUM_CONNECTIONS_PER_FIBER*NUM_USER_FIBERS_ALLOWED_TO_RUN*NUM_THREADS;
+ double avg_req_sec = (overallInfo.total_time) ? overallInfo.num_success/((double)overallInfo.total_time/1000000) : 0;
+ double avg_tm_suc_req = (overallInfo.num_success) ? (double) (overallInfo.total_success_time) / (double) overallInfo.num_success : 0;
+ double avg_suc_req_sec = (overallInfo.total_success_time) ? overallInfo.num_success/((double)overallInfo.total_success_time/1000000) : 0;
+
+
+ float estimatedSuccess = (total_should_run) ? (overallInfo.num_success*100/total_should_run) : 0;
+ float successPercent = (overallInfo.num_success+num_fail) ? (overallInfo.num_success*100/(overallInfo.num_success+num_fail)) : 0;
+ cout<<"estimated success % = "<<estimatedSuccess
+ <<" , real success % = "<<successPercent
+ <<" , avg tm/suc req = "<<avg_tm_suc_req<<" us"
+ <<" , avg succ req/sec = "<< avg_suc_req_sec
+ <<" , avg req/sec = "<< avg_req_sec
+ <<" , tot time = " <<overallInfo.total_time<<" us"
+ <<" , tot shd run = "<<total_should_run
+ <<" , num att = "<<overallInfo.num_attempt
+ <<" , suc cnt = "<<overallInfo.num_success
+ <<" , estimated fail cnt = "<<total_should_run-overallInfo.num_success
+ <<" , real fail cnt = "<<num_fail
+ <<" , thrd_creat_err = "<<overallInfo.thread_create_error
+ <<" , sokt_creat_err = "<<overallInfo.socket_creation_error
+ <<" , connect_error = "<<overallInfo.connect_error
+ <<" , err_resp = "<<overallInfo.error_response
+ <<" , invalid_resp = "<<overallInfo.invalid_response
+ <<" , read_err = "<<overallInfo.read_error
+ <<" , write_err = "<<overallInfo.write_error<<endl;
+
+ if(!overallInfo.results.size())
+ return;
+
+ sort (overallInfo.results.begin(), overallInfo.results.end());
+ unsigned int lastDump = 0;
+ unsigned currLocation = 0;
+ int counter = 0;
+ cout<<"percentile breakdown w/ size = "<<overallInfo.results.size()<<endl;
+ unsigned int lastValue = *(overallInfo.results.begin());
+ for (vector<unsigned long long int>::iterator it=overallInfo.results.begin();
+ it!=overallInfo.results.end();
+ ++it)
+ {
+ currLocation = 100*counter++/overallInfo.results.size();
+ if(lastDump == currLocation)
+ continue;
+ if((currLocation % 10 ) == 0 || (currLocation) >= 95)
+ {
+ cout<<" "<<currLocation<<"%"<<" <= "<< *it<<"us"<<endl;
+ lastDump = currLocation;
+ }
+ lastValue = *it;
+ }
+ cout<<"100%"<<" <= "<< lastValue <<"us"<<endl;
+
+ cout<<"min = "<<*(overallInfo.results.begin())<<"us"<<endl;
+ cout<<"max = "<<lastValue<<"us"<<endl;
+}
+
+string remote_addr = "";
+void print_info()
+{
+ cerr<<"connecting to = "<<remote_addr<<endl;
+ cerr<<"NUM_USER_FIBERS_ALLOWED_TO_RUN = "<<NUM_USER_FIBERS_ALLOWED_TO_RUN<<endl;
+ cerr<<"NUM_CONNECTIONS_PER_FIBER = "<<NUM_CONNECTIONS_PER_FIBER<<endl;
+ cerr<<"NUM_REQUESTS_PER_FIBER = "<<NUM_REQUESTS_PER_FIBER<<endl;
+ cerr<<"CONNECT_AND_REQUEST_TIMEOUT = "<<CONNECT_AND_REQUEST_TIMEOUT<<endl;
+ cerr<<"GET_RESPONSE_TIMEOUT = "<<GET_RESPONSE_TIMEOUT<<endl;
+ cerr<<"INTER_SEND_SLEEP = "<<INTER_SEND_SLEEP<<endl;
+ cerr<<"SLEEP_BETWEEN_CONN_SETUP = "<<SLEEP_BETWEEN_CONN_SETUP<<endl;
+ cerr<<"THREAD_COMPLETION_PERCENT_TO_BAIL_ON = "<<THREAD_COMPLETION_PERCENT_TO_BAIL_ON<<endl;
+
+}
+
+void print_usage()
+{
+ cerr<<"StHTTPLoader: "<<endl
+ <<"\t[-f <num_threads>]"<<endl
+ <<"\t[-H <host>]"<<endl
+ <<"\t[-o <host header>]"<<endl
+ <<"\t[-P <port to connect to> (80)]"<<endl
+ <<"\t[-a <rate_to_create_conns/sec>]"<<endl
+ <<"\t[-A <how long to run rate request [-1]"<<endl
+ <<"\t[-t <num_user_threads_to_start>]"<<endl
+ <<"\t[-C <NUM_CONNECTIONS_PER_FIBER>]"<<endl
+ <<"\t[-R <NUM_REQUESTS_PER_FIBER>]"<<endl
+ <<"\t[-c <CONNECT_AND_REQUEST_TIMEOUT in ms (1000)]"<<endl
+ <<"\t[-d <GET_RESPONSE_TIMEOUT in ms (1000)>]"<<endl
+ <<"\t[-s INTER_SEND_SLEEP (in msec)]"<<endl
+ <<"\t[-r request string (get ip:1.2.3.4)]"<<endl
+ <<"\t[-e END STRING (END)]"<<endl
+ <<"\t[-S SLEEP_BETWEEN_CONN_SETUP (in msec)]"<<endl
+ <<"\t[-Z set the sleep to be random"<<endl
+ <<"\t[-b thread completion percent at which to bail"<<endl
+ <<"\t[-m generate random url"<<endl
+ <<"\t[-M constraint on the number of urls to generate"<<endl
+ <<"\t[-U sock buf sizes for snd + recv [default = 0/none]"<<endl
+ <<"\t[-? this help screen]"<<endl
+ <<"\t[-h this help screen]"<<endl;
+ exit(0);
+}
+
+int main(int argc, char** argv)
+{
+ if(pthread_key_create(&threadUpdateOverallInfo, 0) != 0)
+ {
+ cerr<<"couldnt create key for threadUpdateOverallInfo "<<strerror(errno)<<endl;
+ return 1;
+ }
+
+ string rem_port = "80";
+ string rem_addr = "127.0.0.1";
+ char ch;
+ while ((ch = getopt(argc, argv, "M:Z:U:x:X:m:o:A:a:b:r:S:t:H:P:R:C:f:c:d:s:?h")) != -1)
+ {
+ switch (ch)
+ {
+ case 'Z':
+ sleepShouldBeRandom = atoi(optarg);
+ break;
+ case 'm':
+ GENERATE_RANDOM_STRING = atoi(optarg);
+ break;
+ case 'M':
+ GENERATE_RANDOM_STRING_CONSTRAINT = atoi(optarg);
+ break;
+ case 'o':
+ host_header = optarg;
+ break;
+ case 'P':
+ rem_port = optarg;
+ break;
+ case 'H':
+ rem_addr = optarg;
+ break;
+ case 't':
+ NUM_USER_FIBERS_ALLOWED_TO_RUN = atoi(optarg);
+ break;
+ case 'a':
+ RATE_TO_CREATE_FIBERS = atoi(optarg);
+ break;
+ case 'A':
+ RATE_TO_CREATE_FIBERS_TIME = atoi(optarg);
+ break;
+ case 'C':
+ NUM_CONNECTIONS_PER_FIBER = atoi(optarg);
+ break;
+ case 'R':
+ NUM_REQUESTS_PER_FIBER = atoi(optarg);
+ break;
+ case 'f':
+ NUM_THREADS = atoi(optarg);
+ break;
+ case 'S':
+ SLEEP_BETWEEN_CONN_SETUP = atoi(optarg)*1000;
+ if(SLEEP_BETWEEN_CONN_SETUP < 0)
+ SLEEP_BETWEEN_CONN_SETUP = 0;
+ break;
+ case 'r':
+ HTTP_BASE_REQ_STRING = optarg;
+ break;
+ case 'c':
+ CONNECT_AND_REQUEST_TIMEOUT = atoi(optarg)*1000;
+ if(CONNECT_AND_REQUEST_TIMEOUT <= 0)
+ CONNECT_AND_REQUEST_TIMEOUT = -1;
+ break;
+ case 'd':
+ GET_RESPONSE_TIMEOUT = atoi(optarg)*1000;
+ if(GET_RESPONSE_TIMEOUT <= 0)
+ GET_RESPONSE_TIMEOUT = -1;
+ break;
+ case 's':
+ INTER_SEND_SLEEP = atoi(optarg)*1000;
+ break;
+ case 'b':
+ THREAD_COMPLETION_PERCENT_TO_BAIL_ON = atoi(optarg);
+ break;
+ case 'U':
+ sockBuf = atoi(optarg);
+ break;
+ case 'h':
+ case '?':
+ default:
+ print_usage();
+ break;
+ }
+ }
+
+ if(!host_header.length())
+ host_header = rem_addr;
+
+
+ remote_addr = rem_addr + ":" + rem_port;
+ print_info();
+
+ read_address(remote_addr.c_str(), &rmt_addr);
+ if (rmt_addr.sin_addr.s_addr == INADDR_ANY)
+ {
+ cerr<<"invalid remote address: "<<remote_addr<<endl;
+ exit(1);
+ }
+
+
+
+ //create the threads
+ pthread_t* thread = new pthread_t[NUM_THREADS];
+ unsigned int i = 0;
+ for(; i<NUM_THREADS; i++)
+ {
+ list<UF*>* ufList = new list<UF*>();
+ ufList->push_back(new SetupClientUF());
+ UFIO::ufCreateThreadWithIO(&thread[i], ufList);
+ }
+
+
+ //wait for kids
+ void* status;
+ for(i=0; i<NUM_THREADS; i++)
+ pthread_join(thread[i], &status);
+ delete [] thread;
+
+ printResults();
+}
Modified: trafficserver/traffic/branches/UserFiber/samples/echoServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/samples/echoServer.C?rev=943474&r1=943473&r2=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/samples/echoServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/samples/echoServer.C Wed May 12 13:00:49 2010
@@ -26,6 +26,7 @@ void EchoServer::handleNewConnection(UFI
}
char buf[256];
+ int n = 0;
while ( ((n = ufio->read(buf, 255, readTimeout)) > 0) && ((n = ufio->write(buf, 255)) > 0) ) {}
}
Copied: trafficserver/traffic/branches/UserFiber/src/UF.C (from r943471, trafficserver/traffic/branches/UserFiber/UF.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UF.C?p2=trafficserver/traffic/branches/UserFiber/src/UF.C&p1=trafficserver/traffic/branches/UserFiber/UF.C&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UF.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UF.C Wed May 12 13:00:49 2010
@@ -107,9 +107,10 @@ static pthread_key_t getThreadKey()
pthread_key_t UFScheduler::_specific_key = getThreadKey();
UFScheduler::UFScheduler()
{
+ _exitJustMe = false;
_specific = 0;
_currentFiber = 0;
- //_exit = false;
+
if(_inThreadedMode)
{
pthread_mutex_init(&_mutexToNominateToActiveList, NULL);
@@ -255,7 +256,7 @@ void UFScheduler::runScheduler()
struct timeval now;
struct timeval start,finish;
gettimeofday(&start, 0);
- while(!_exit)
+ while(!_exitJustMe && !_exit)
{
UFList::iterator beg = _activeRunningList.begin();
for(; beg != _activeRunningList.end(); )
@@ -423,3 +424,35 @@ int UFFactory::registerFunc(UF* uf)
return _size++;
}
+void* setupThread(void* args)
+{
+ if(!args)
+ return 0;
+
+ list<UF*>* ufsToStartWith = (list<UF*>*) args;
+ UFScheduler ufs;
+ for(list<UF*>::iterator beg = ufsToStartWith->begin();
+ beg != ufsToStartWith->end();
+ ++beg)
+ ufs.addFiberToScheduler(*beg);
+ delete ufsToStartWith;
+
+ //run the scheduler
+ ufs.runScheduler();
+
+ return 0;
+}
+
+void UFScheduler::ufCreateThread(pthread_t* tid, list<UF*>* ufsToStartWith)
+{
+ //create the threads
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ if(pthread_create(tid, &attr, setupThread, (void*)ufsToStartWith) != 0)
+ {
+ cerr<<"couldnt create thread "<<strerror(errno)<<endl;
+ exit(1);
+ }
+}
Propchange: trafficserver/traffic/branches/UserFiber/src/UF.C
------------------------------------------------------------------------------
svn:mergeinfo =
Copied: trafficserver/traffic/branches/UserFiber/src/UFIO.C (from r943471, trafficserver/traffic/branches/UserFiber/UFIO.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFIO.C?p2=trafficserver/traffic/branches/UserFiber/src/UFIO.C&p1=trafficserver/traffic/branches/UserFiber/UFIO.C&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFIO.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFIO.C Wed May 12 13:00:49 2010
@@ -422,36 +422,21 @@ int UFIO::connect(const struct sockaddr
//find the scheduler for this request
UFIOScheduler* tmpUfios = _ufios ? _ufios : getUFIOS();
- int n = 0;
- int err = 0;
while(::connect(_fd, addr, addrlen) < 0)
{
_errno = errno;
- if(errno != EINTR)
+ if(errno == EINTR)
+ continue;
+ else if(errno == EINPROGRESS || errno == EAGAIN)
{
- if((errno != EINPROGRESS || errno != EAGAIN) &&
- (errno != EADDRINUSE || err == 0))
- return -1;
-
- //wait to finish the connect
if(!tmpUfios->setupForConnect(this, timeout))
{
cerr<<"couldnt setup for connect - "<<strerror(errno)<<endl;
return -1;
}
-
- n = sizeof(int);
- if (getsockopt(_fd, SOL_SOCKET, SO_ERROR, (char *)&err, (socklen_t *)&n) < 0)
- return -1;
- if(err)
- {
- _errno = err;
- return -1;
- }
-
- //successful
- break;
}
+ else
+ return -1;
}
return 0;
@@ -892,7 +877,7 @@ void EpollUFIOScheduler::waitForEvents(T
#endif
myScheduler->_notifyFunc = notifyEpollFunc;
//add the UF to handle the efds calls
- UF* eventFdFiber = UFFactory::getInstance()->selectUF(ReadNotificationUF::_myLoc)->createUF();
+ UF* eventFdFiber = new ReadNotificationUF();
eventFdFiber->_startingArgs = ens;
myScheduler->addFiberToScheduler(eventFdFiber, 0);
@@ -1013,4 +998,23 @@ void EpollUFIOScheduler::waitForEvents(T
myScheduler->_notifyFunc = 0;
}
+int IORunner::_myLoc = -1;
+IORunner* IORunner::_self = new IORunner(true);
+void IORunner::run()
+{
+ UF* uf = UFScheduler::getUF();
+ //add the scheduler for this
+ EpollUFIOScheduler* ioRunner = new EpollUFIOScheduler(uf, 10000); //TODO: support other event scheduler mechanisms later
+ if(!ioRunner || !ioRunner->isSetup())
+ {
+ cerr<<"couldnt setup epoll io scheduler object"<<endl;
+ return;
+ }
+ ioRunner->waitForEvents(1000000); //TODO: allow to change the epoll interval later
+}
+void UFIO::ufCreateThreadWithIO(pthread_t* tid, list<UF*>* ufsToStartWith)
+{
+ ufsToStartWith->push_front(new IORunner());
+ UFScheduler::ufCreateThread(tid, ufsToStartWith);
+}
Propchange: trafficserver/traffic/branches/UserFiber/src/UFIO.C
------------------------------------------------------------------------------
svn:mergeinfo =
Copied: trafficserver/traffic/branches/UserFiber/src/UFServer.C (from r943471, trafficserver/traffic/branches/UserFiber/UFServer.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFServer.C?p2=trafficserver/traffic/branches/UserFiber/src/UFServer.C&p1=trafficserver/traffic/branches/UserFiber/UFServer.C&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFServer.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFServer.C Wed May 12 13:00:49 2010
@@ -108,9 +108,9 @@ static void* acceptThreadStart(void* arg
UFScheduler ufs;
//add the io scheduler
- ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+ ufs.addFiberToScheduler(new IORunner());
//add the accept port fiber
- UF* uf = UFFactory::getInstance()->selectUF(AcceptRunner::_myLoc)->createUF();
+ UF* uf = new AcceptRunner();
if(!uf)
return 0;
uf->_startingArgs = args;
@@ -128,7 +128,7 @@ static void* ioThreadStart(void* args)
UFScheduler ufs;
//add the io scheduler
- ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+ ufs.addFiberToScheduler(new IORunner());
((UFServer*) args)->addThread("NETIO", &ufs);
ufs.runScheduler();
@@ -287,17 +287,3 @@ vector<pthread_t>* UFServer::getThreadTy
}
-int IORunner::_myLoc = -1;
-IORunner* IORunner::_self = new IORunner(true);
-void IORunner::run()
-{
- UF* uf = UFScheduler::getUF();
- //add the scheduler for this
- EpollUFIOScheduler* ioRunner = new EpollUFIOScheduler(uf, 10000); //TODO: support other event scheduler mechanisms later
- if(!ioRunner || !ioRunner->isSetup())
- {
- cerr<<"couldnt setup epoll io scheduler object"<<endl;
- return;
- }
- ioRunner->waitForEvents(1000000); //TODO: allow to change the epoll interval later
-}
Propchange: trafficserver/traffic/branches/UserFiber/src/UFServer.C
------------------------------------------------------------------------------
svn:mergeinfo =
Copied: trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C (from r943471, trafficserver/traffic/branches/UserFiber/UFStatSystem.C)
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C?p2=trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C&p1=trafficserver/traffic/branches/UserFiber/UFStatSystem.C&r1=943471&r2=943474&rev=943474&view=diff
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFStatSystem.C (original)
+++ trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C Wed May 12 13:00:49 2010
@@ -1,9 +1,9 @@
+#include <string.h>
+#include <stdio.h>
#include "UFStatSystem.H"
#include "UF.H"
#include "UFIO.H"
#include "UFServer.H"
-#include <string.h>
-#include <stdio.h>
#include <iostream>
#include <errno.h>
#include <sys/types.h>
@@ -466,16 +466,16 @@ void* UFStatCollector::scheduler(void *a
UFScheduler ufs;
// stat collector
- ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(CollectorRunner::_myLoc)->createUF());
+ ufs.addFiberToScheduler(new CollectorRunner());
// set thread for stat command listener to run on
StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
// io scheduler
- ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+ ufs.addFiberToScheduler(new IORunner());
// stat command listener
- ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(StatCommandListenerRun::_myLoc)->createUF());
+ ufs.addFiberToScheduler(new StatCommandListenerRun());
((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
ufs.runScheduler();
return 0;
Propchange: trafficserver/traffic/branches/UserFiber/src/UFStatSystem.C
------------------------------------------------------------------------------
svn:mergeinfo =