You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/07 23:10:36 UTC
svn commit: r942228 [2/2] - in /trafficserver/traffic/branches/UserFiber:
Makefile UF.C UF.H UFIO.C UFIO.H UFServer.C UFServer.H UFStatSystem.C
UFStatSystem.H ufHTTPServer.C
Added: trafficserver/traffic/branches/UserFiber/UFStatSystem.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UFStatSystem.C?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFStatSystem.C (added)
+++ trafficserver/traffic/branches/UserFiber/UFStatSystem.C Fri May 7 21:10:35 2010
@@ -0,0 +1,545 @@
+#include "UFStatSystem.H"
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
+#include <iostream>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+UFServer *UFStatSystem::server;
+
+std::map<std::string, uint32_t> UFStatSystem::stat_name_to_num;
+std::vector< std::pair<std::string, long long> > UFStatSystem::global_stats;
+uint32_t UFStatSystem::MAX_STATS_ALLOWED = 500000;
+uint32_t UFStatSystem::NUM_STATS_ESTIMATE = 5000;
+static UFMutex statsMutex;
+
+void UFStatSystem::incrementGlobal(uint32_t stat_num, long long stat_val)
+{
+ if(stat_num >= global_stats.size()) {
+ return;
+ }
+ global_stats[stat_num].second += stat_val;
+}
+
+bool UFStatSystem::increment(uint32_t stat_num, long long stat_val)
+{
+ if(stat_num >= MAX_STATS_ALLOWED) {
+ return false;
+ }
+
+ // Increment stat in this thread.
+ // If resize is required, take the thread's stats_lock
+
+ UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+
+ if(running_thread_scheduler->_stats.size() < (unsigned int)stat_num+1) {
+ running_thread_scheduler->_stats_lock.lock(running_user_fiber);
+ uint32_t stat_vec_size = ( (NUM_STATS_ESTIMATE >= stat_num+1) ? NUM_STATS_ESTIMATE : stat_num+1);
+ running_thread_scheduler->_stats.resize(stat_vec_size, 0);
+ running_thread_scheduler->_stats_lock.unlock(running_user_fiber);
+ }
+
+ running_thread_scheduler->_stats[stat_num] += stat_val;
+ return true;
+}
+
+bool UFStatSystem::increment(const char *stat_name, long long stat_val)
+{
+ uint32_t stat_num;
+ if(!getStatNum(stat_name, stat_num)) {
+ return false;
+ }
+ return increment(stat_num, stat_val);
+}
+
+bool UFStatSystem::get(uint32_t stat_num, long long *stat_val)
+{
+ // Get stat lock
+ UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+ statsMutex.lock(running_user_fiber);
+
+ // Check if stat_num is valid
+ if(stat_num >= global_stats.size()) {
+ statsMutex.unlock(running_user_fiber);
+ return false;
+ }
+
+ // Get stat value from global map
+ *stat_val = global_stats[stat_num].second;
+ statsMutex.unlock(running_user_fiber);
+ return true;
+}
+
+bool UFStatSystem::get(const char *stat_name, long long *stat_val)
+{
+ // Get value of stat with name == stat_name
+ // Returns value from the global stats map. does not walk threads
+ uint32_t stat_num;
+ if(!getStatNum(stat_name, stat_num)) {
+ return false;
+ }
+ return get(stat_num, stat_val);
+}
+
+bool UFStatSystem::get_current(uint32_t stat_num, long long *stat_val)
+{
+ // Returns current value of stat. Walks all threads
+
+ UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+
+ *stat_val = 0;
+ // Collect stat from all threads
+
+ StringThreadMapping * all_threads = server->getThreadList();
+
+ for(std::map<std::string, std::vector<pthread_t>* >::const_iterator map_it = all_threads->begin();
+ map_it != all_threads->end();
+ map_it++) {
+ for(std::vector<pthread_t>::const_iterator thread_it = map_it->second->begin();
+ thread_it != map_it->second->end(); thread_it++) {
+ UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(*thread_it);
+ this_thread_scheduler->_stats_lock.lock(running_user_fiber);
+ if(this_thread_scheduler->_stats.size() > stat_num) {
+ *stat_val += this_thread_scheduler->_stats[stat_num];
+ }
+ this_thread_scheduler->_stats_lock.unlock(running_user_fiber);
+ }
+ }
+ return true;
+}
+
+bool UFStatSystem::get_current(const char *stat_name, long long *stat_val)
+{
+ uint32_t stat_num;
+ if(!getStatNum(stat_name, stat_num)) {
+ return false;
+ }
+ return get_current(stat_num, stat_val);
+}
+
+bool UFStatSystem::registerStat(const char *stat_name, uint32_t *stat_num, bool lock_needed)
+{
+ if(!stat_num ) {
+ return false;
+ }
+
+ // Get stat lock
+ UFScheduler* running_thread_scheduler = NULL;
+ UF* running_user_fiber = NULL;
+ if(lock_needed) {
+ running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+ statsMutex.lock(running_user_fiber);
+ }
+
+ // Check if stat is already registered
+ std::map<std::string, uint32_t>::const_iterator stat_name_it = stat_name_to_num.find(stat_name);
+ if(stat_name_it != stat_name_to_num.end()) {
+ *stat_num = stat_name_it->second;
+ if(lock_needed)
+ statsMutex.unlock(running_user_fiber);
+ return true;
+ }
+
+ // Check to see if limit for max allowed stats was hit
+ if(global_stats.size() == MAX_STATS_ALLOWED) {
+ if(lock_needed)
+ statsMutex.unlock(running_user_fiber);
+ *stat_num = MAX_STATS_ALLOWED;
+ return false;
+ }
+
+ // Regiter new stat. Store mapping from stat_num to name
+ global_stats.push_back(std::make_pair(stat_name, 0));
+ *stat_num = global_stats.size() - 1;
+ stat_name_to_num[stat_name] = *stat_num;
+
+ // Release stat lock
+ if(lock_needed)
+ statsMutex.unlock(running_user_fiber);
+ return true;
+}
+
+void UFStatSystem::setMaxStatsAllowed(uint32_t max_stats_allowed)
+{
+ MAX_STATS_ALLOWED = max_stats_allowed;
+}
+
+void UFStatSystem::setNumStatsEstimate(uint32_t num_stats_estimate)
+{
+ if(num_stats_estimate < MAX_STATS_ALLOWED)
+ NUM_STATS_ESTIMATE = num_stats_estimate;
+ else
+ NUM_STATS_ESTIMATE = MAX_STATS_ALLOWED;
+}
+
+void UFStatSystem::setStatCommandPort(int port)
+{
+ UFStatCollector::setStatCommandPort(port);
+}
+
+void UFStatSystem::setReadTimeout(int secs, long usecs)
+{
+ UFStatCollector::setReadTimeout(secs, usecs);
+}
+
+void UFStatSystem::init(UFServer* ufs)
+{
+ // Store server pointer
+ server = ufs;
+ UFStatCollector::init(ufs);
+}
+
+void UFStatSystem::clear()
+{
+ for(std::vector< std::pair<std::string, long long> >::iterator it = UFStatSystem::global_stats.begin();
+ it != UFStatSystem::global_stats.end(); it++) {
+ it->second = 0;
+ }
+}
+
+void UFStatSystem::collect()
+{
+ UFScheduler* stat_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* stat_user_fiber = stat_thread_scheduler->getRunningFiberOnThisThread();
+ statsMutex.lock(stat_user_fiber);
+ UFStatSystem::clear();
+
+ StringThreadMapping * all_threads = server->getThreadList();
+
+ for(std::map<std::string, std::vector<pthread_t>* >::const_iterator map_it = all_threads->begin();
+ map_it != all_threads->end();
+ map_it++) {
+ for(std::vector<pthread_t>::const_iterator thread_it = map_it->second->begin();
+ thread_it != map_it->second->end(); thread_it++) {
+ UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(*thread_it);
+ // Lock thread stats to prevent resizing on increment
+ this_thread_scheduler->_stats_lock.lock(stat_user_fiber);
+ int i = 0;
+ for(std::vector<long long>::iterator it = this_thread_scheduler->_stats.begin();
+ it != this_thread_scheduler->_stats.end(); it++, i++) {
+ if(*it != 0) {
+ incrementGlobal(i, *it);
+ }
+ }
+ // Release thread stats lock
+ this_thread_scheduler->_stats_lock.unlock(stat_user_fiber);
+ }
+ }
+ statsMutex.unlock(stat_user_fiber);
+}
+
+bool UFStatSystem::getStatNum(const char *stat_name, uint32_t &stat_num)
+{
+ UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+ statsMutex.lock(running_user_fiber);
+
+ // Get stat num and release lock
+ std::map<std::string, uint32_t>::const_iterator stat_name_it = stat_name_to_num.find(stat_name);
+ if(stat_name_it == stat_name_to_num.end()) {
+ statsMutex.unlock(running_user_fiber);
+ return false;
+ }
+
+ stat_num = stat_name_it->second;
+ statsMutex.unlock(running_user_fiber);
+ return true;
+}
+
+struct StatThreadChooser : public UFIOAcceptThreadChooser
+{
+ static pair<UFScheduler*, pthread_t> _accept_thread;
+ pair<UFScheduler*, pthread_t> pickThread(int listeningFd)
+ {
+ return _accept_thread;
+ }
+};
+pair<UFScheduler*, pthread_t> StatThreadChooser::_accept_thread;
+
+static int MAX_STAT_NAME_LENGTH = 512;
+int UFStatCollector::_statCommandPort = 8091;
+time_t UFStatCollector::_startTime = time(NULL);
+int UFStatCollector::_readTimeout = 600;
+long UFStatCollector::_readTimeoutUSecs = 0;
+
+void UFStatCollector::setStatCommandPort(int port)
+{
+ _statCommandPort = port;
+}
+
+void UFStatCollector::setReadTimeout(int secs, long usecs)
+{
+ _readTimeout = secs;
+ _readTimeoutUSecs = usecs;
+}
+
+void UFStatCollector::init(UFServer* ufs)
+{
+ pthread_t stat_thread;
+ pthread_create(&stat_thread, NULL, scheduler, ufs);
+}
+
+int CollectorRunner::_myLoc = -1;
+CollectorRunner* CollectorRunner::_self = new CollectorRunner(true);
+void CollectorRunner::run()
+{
+ UF* uf = UFScheduler::getUF();
+ UFScheduler* ufs = uf->getParentScheduler();
+
+ while(1)
+ {
+ uf->usleep(60000000); //60 secs
+ UFStatSystem::collect();
+ }
+ ufs->setExit();
+}
+
+
+//---------------------------------------------------------------------
+// Handles a command port client connection
+struct StatCommandProcessing : public UF
+{
+ void run();
+ StatCommandProcessing(bool registerMe = false)
+ {
+ if(registerMe)
+ _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+ }
+ UF* createUF() { return new StatCommandProcessing(); }
+ static StatCommandProcessing* _self;
+ static int _myLoc;
+};
+int StatCommandProcessing::_myLoc = -1;
+StatCommandProcessing* StatCommandProcessing::_self = new StatCommandProcessing(true);
+void StatCommandProcessing::run()
+{
+ if (!_startingArgs)
+ return;
+
+ //1. create the new UFIO from the new fd
+ UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
+ UFIO* ufio = fiberStartingArgs->ufio;
+
+ static const char cmdUnrec[] = "Unrecognized command.\r\n";
+ static const char cmdHelp[] = "Valid commands are: \r\n"
+ " stats - Print stats which have been collected.\r\n"
+ " stats_current - Print stats after forcing a collect\r\n"
+ " stat (<stat_name> )* - Print values for stats that are specified. Does not collect\r\n"
+ " stat_current (<stat_name> )* - Print values for stats that are specified after collecting from all threads\r\n"
+ " help - Prints this message.\r\n"
+ " quit - Close this connection.\r\n"
+ ;
+ int readbytes;
+ char readbuf[1024];
+ std::string readData;
+
+ while(1) {
+ if((readbytes = ufio->read(readbuf, 1023, 0)) <= 0)
+ break;
+
+ readData.append(readbuf, readbytes);
+
+ if(readData.find("\r\n") == string::npos)
+ continue;
+
+ if(readData.find("stats_current") != string::npos) {
+ // Force a collect before printing out the stats
+ UFStatSystem::collect();
+ std::stringstream printbuf;
+ UFStatCollector::printStats(printbuf);
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+ //failed write, break to close connection
+ break;
+ }
+ }
+ else if (readData.find("stats") != string::npos) {
+ std::stringstream printbuf;
+ UFStatCollector::printStats(printbuf);
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+ //failed write, break to close connection
+ break;
+ }
+ }
+ else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos) {
+ std::vector<std::string> stats;
+ char stat_name[MAX_STAT_NAME_LENGTH];
+ bzero(stat_name, MAX_STAT_NAME_LENGTH);
+ int next;
+ char *start = (char *)readData.c_str();
+
+ // determine if collection has to be forced or not
+ bool get_current = false;
+ if(readData.find("stat ") != string::npos)
+ start += strlen("stat ");
+ else {
+ start += strlen("stat_current ");
+ get_current = true;
+ }
+
+ while(sscanf(start, "%s%n", stat_name, &next) == 1) {
+ // Prefix support
+ char *prefix_end = strchr(start, '*');
+ if(prefix_end != NULL) {
+ std::string prefix;
+ prefix.assign(start, prefix_end-start);
+ // Get all stats with the prefix
+ UFStatCollector::getStatsWithPrefix(prefix, stats);
+ }
+ else {
+ stats.push_back(stat_name);
+ }
+ bzero(stat_name, MAX_STAT_NAME_LENGTH);
+ start+=next;
+ }
+ std::stringstream printbuf;
+
+ UFStatCollector::printStats(stats, printbuf, get_current);
+ if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+ //failed write, break to close connection
+ break;
+ }
+ }
+ else if (readData.find("help") != string::npos) {
+ if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1) {
+ //failed write, break to close connection
+ break;
+ }
+ }
+ else if (readData.find("quit") != string::npos) {
+ break;
+ }
+ else {
+ if ((ufio->write(cmdUnrec, sizeof(cmdUnrec)-1) == -1) ||
+ (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)) {
+ //failed write, break to close connection
+ break;
+ }
+ }
+ readData.clear();
+ } // END while loop
+
+ delete ufio;
+ delete fiberStartingArgs;
+}
+
+//----------------------------------------------------------------------
+// Creates a socket for command port and listens for connection requests
+int StatCommandListenerRun::_myLoc = -1;
+StatCommandListenerRun* StatCommandListenerRun::_self = new StatCommandListenerRun(true);
+void StatCommandListenerRun::run()
+{
+ int fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
+ if(fd < 0)
+ {
+ cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
+ return;
+ }
+
+ UFIO* ufio = new UFIO(UFScheduler::getUF());
+ if(!ufio)
+ {
+ cerr<<"couldnt setup accept thread"<<endl;
+ ::close(fd);
+ return;
+ }
+ ufio->setFd(fd, false);
+
+ StatThreadChooser ufiotChooser;
+ ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, 0, 0, 0);
+}
+
+void* UFStatCollector::scheduler(void *args)
+{
+ if(!args)
+ return 0;
+
+ // add jobs to scheduler
+ UFScheduler ufs;
+
+ // stat collector
+ ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(CollectorRunner::_myLoc)->createUF());
+
+ // set thread for stat command listener to run on
+ StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
+
+ // io scheduler
+ ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+
+ // stat command listener
+ ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(StatCommandListenerRun::_myLoc)->createUF());
+ ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
+ ufs.runScheduler();
+ return 0;
+}
+
+//----------------------------------------------------------
+void UFStatCollector::printStats(std::stringstream &printbuf) {
+ printbuf << "Cache stats: \n"
+ "-----------------------------------------------------------------------------\n";
+
+ printbuf << "TIME " << _startTime <<"\n";
+
+ UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+ statsMutex.lock(running_user_fiber);
+
+ for(std::vector< std::pair<std::string, long long> >::const_iterator it = UFStatSystem::global_stats.begin();
+ it != UFStatSystem::global_stats.end(); it++) {
+ if(it->second != 0 ) {
+ printbuf << "STAT " << it->first << " " << it->second << "\n";
+ }
+ }
+ statsMutex.unlock(running_user_fiber);
+
+ printbuf << "END\n";
+}
+
+void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current) {
+ // Print only non zero stats
+ long long stat_val = 0;
+ bool stat_get_status;
+ if(current) {
+ stat_get_status = UFStatSystem::get_current(stat_name, &stat_val);
+ }
+ else {
+ stat_get_status = UFStatSystem::get(stat_name, &stat_val);
+ }
+
+ if(stat_get_status && stat_val != 0) {
+ printbuf << "STAT " << stat_name << " " << stat_val << "\n";
+ }
+}
+
+void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current) {
+ printbuf << "TIME " << _startTime <<"\n";
+ for(std::vector<std::string>::const_iterator it = stat_names.begin();
+ it != stat_names.end();
+ it++) {
+ printStat(it->c_str(), printbuf, current);
+ }
+ printbuf << "END\n";
+}
+
+void
+UFStatCollector::getStatsWithPrefix(const std::string &stat_prefix, std::vector<std::string> &stat_names)
+{
+ UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+ UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+ statsMutex.lock(running_user_fiber);
+ // Get all stats which start with stat_prefix
+ for(std::vector< std::pair<std::string, long long> >::const_iterator it = UFStatSystem::global_stats.begin();
+ it != UFStatSystem::global_stats.end(); it++) {
+ size_t found = it->first.find(stat_prefix);
+ if(found == 0) {
+ stat_names.push_back(it->first);
+ }
+ }
+ statsMutex.unlock(running_user_fiber);
+}
Added: trafficserver/traffic/branches/UserFiber/UFStatSystem.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UFStatSystem.H?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFStatSystem.H (added)
+++ trafficserver/traffic/branches/UserFiber/UFStatSystem.H Fri May 7 21:10:35 2010
@@ -0,0 +1,108 @@
+/******************* _*_ Mode: C++; Indent: Inktomi4 _*_ *******************
+ Stats
+
+ ***************************************************************************/
+
+#ifndef UF_STAT_SYSTEM_H
+#define UF_STAT_SYSTEM_H
+
+#include <map>
+#include <string>
+#include <vector>
+#include <utility>
+#include <sstream>
+#include "UF.H"
+
+class UFIO;
+class UFServer;
+class UFStatSystem
+{
+public:
+ friend class CollectorRunner;
+ friend class StatCommandProcessing;
+
+ static bool increment(uint32_t stat_num, long long stat_val = 1);
+ static bool increment(const char *stat_name, long long stat_val = 1);
+ static bool get(uint32_t stat_num, long long *stat_val);
+ static bool get(const char *stat_name, long long *stat_val);
+ static bool get_current(uint32_t stat_num, long long *stat_val);
+ static bool get_current(const char *stat_name, long long *stat_val);
+
+ static bool registerStat(const char *stat_name, uint32_t *stat_num, bool lock_needed = true);
+ static void setMaxStatsAllowed(uint32_t max_stats_allowed);
+ static void setNumStatsEstimate(uint32_t num_stats_estimate);
+ static void setStatCommandPort(int port);
+ static void setReadTimeout(int secs = 1, long usecs = 0);
+ static void init(UFServer* ufs);
+
+private:
+ // These must be called after acquiring the global stats lock
+ // Since these are private, only methods in UFStatCollectorContinuation can call them
+ static void incrementGlobal(uint32_t stat_num, long long stat_val = 1);
+ static void clear();
+ static void collect();
+
+ static bool getStatNum(const char *stat_name, uint32_t &stat_num);
+ static UFServer *server;
+ static std::map<std::string, uint32_t> stat_name_to_num;
+ static std::vector< std::pair<std::string, long long> > global_stats;
+ static uint32_t MAX_STATS_ALLOWED;
+ static uint32_t NUM_STATS_ESTIMATE;
+
+ friend class UFStatCollector;
+};
+
+struct CollectorRunner : public UF
+{
+ void run();
+ CollectorRunner(bool registerMe = false)
+ {
+ if(registerMe)
+ _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+ }
+ UF* createUF() { return new CollectorRunner(); }
+ static CollectorRunner* _self;
+ static int _myLoc;
+};
+
+struct StatCommandListenerRun;
+class UFStatCollector
+{
+public:
+ friend class StatCommandListenerRun;
+ friend class StatCommandProcessing;
+
+ static void setStatCommandPort(int port);
+ static void setReadTimeout(int secs = 1, long usecs = 0);
+ static void init(UFServer* ufs);
+
+private:
+ static void *scheduler(void *args);
+ static void *commandLoop(void *data);
+ static void printStats(std::stringstream &printbuf);
+ static void printStat(const char *stat_name, std::stringstream &printbuf, bool current = false);
+ static void printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current = false);
+ static void getStatsWithPrefix(const std::string &stat_prefix, std::vector<std::string> &stat_names);
+
+ // member variables
+ static int _statCommandPort;
+ static time_t _startTime;
+ static int _readTimeout;
+ static long _readTimeoutUSecs;
+};
+
+struct StatCommandListenerRun : public UF
+{
+ void run();
+ StatCommandListenerRun(bool registerMe = false)
+ {
+ if(registerMe)
+ _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+ }
+ UF* createUF() { return new StatCommandListenerRun(); }
+ static StatCommandListenerRun* _self;
+ static int _myLoc;
+};
+
+#endif // UF_STAT_SYSTEM
+
Added: trafficserver/traffic/branches/UserFiber/ufHTTPServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/ufHTTPServer.C?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/ufHTTPServer.C (added)
+++ trafficserver/traffic/branches/UserFiber/ufHTTPServer.C Fri May 7 21:10:35 2010
@@ -0,0 +1,112 @@
+#include <iostream>
+#include <errno.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <vector>
+#include <string.h>
+
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
+#include "UFStatSystem.H"
+
+using namespace std;
+
+static uint32_t http_request;
+struct HTTPServer : public UFServer
+{
+ HTTPServer(char* interfaceIP, unsigned int port)
+ {
+ _addressToBindTo = interfaceIP ? interfaceIP : "";
+ _port = port;
+ }
+ void handleNewConnection(UFIO* ufio);
+ void preAccept() { UFStatSystem::registerStat("http_request", &http_request, false); }
+};
+
+const char* HTTP_ANSWER = "HTTP/1.0 200 OK\r\nConnection: keep-alive\r\nCache-Control: private, max-age=0\r\nContent-Type: text/html; charset=ISO-8859-1\r\nContent-Length: 5\r\n\r\nhello";
+const unsigned int HTTP_ANSWER_LENGTH = strlen(HTTP_ANSWER);
+unsigned int counter = 0;
+unsigned long long int readTimeout = 0;
+void HTTPServer::handleNewConnection(UFIO* ufio)
+{
+ UFStatSystem::increment(http_request);
+ if(!ufio)
+ {
+ return;
+ }
+
+ char buf[256];
+ int amtRead = 0;
+ int amtWritten = 0;
+ bool bail = false;
+ size_t n = 0;
+ string readData;
+ while(!bail)
+ {
+ //3. setup for read
+ amtRead = ufio->read(buf, 255, readTimeout);
+ if(amtRead > 0)
+ {
+ readData.append(buf, amtRead);
+ if(readData.find("\r\n\r\n") == string::npos)
+ continue;
+
+ //4. write what we've read
+ int amtToWrite = HTTP_ANSWER_LENGTH; //amtRead
+ char* bufToWrite = const_cast<char*>(HTTP_ANSWER); //buf
+ amtWritten = 0;
+ while(1)
+ {
+ n = ufio->write(bufToWrite+amtWritten, amtToWrite-amtWritten);
+ if(n <= 0)
+ {
+ if(errno == EINTR)
+ continue;
+ bail = true;
+ break;
+ }
+ amtWritten += n;
+ if(amtWritten == amtToWrite)
+ break;
+ else if(amtWritten > amtToWrite)
+ {
+ bail = true;
+ break;
+ }
+ }
+
+ readData.clear();
+ }
+ else if(amtRead <= 0)
+ break;
+ }
+}
+
+
+int main(int argc, char** argv)
+{
+ unsigned int numThreads = 8;
+ unsigned int numProcesses = 1;
+ unsigned short int port = 8080;
+ if(argc > 1)
+ numThreads = atoi(argv[1]);
+ if(argc > 2)
+ port = atoi(argv[2]);
+ if(argc > 3)
+ readTimeout = atoi(argv[3]);
+ if(argc > 4)
+ numProcesses = atoi(argv[4]);
+
+ cerr<<"setting readtimeout = "<<readTimeout<<endl;
+
+ HTTPServer ufhttp(0, port);
+ ufhttp.MAX_ACCEPT_THREADS_ALLOWED = 1;
+ ufhttp.MAX_THREADS_ALLOWED = numThreads;
+ ufhttp.MAX_PROCESSES_ALLOWED = numProcesses;
+ ufhttp.UF_STACK_SIZE = 8192;
+
+ ufhttp.run();
+
+ return 0;
+}