You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ak...@apache.org on 2010/05/07 23:10:36 UTC

svn commit: r942228 [2/2] - in /trafficserver/traffic/branches/UserFiber: Makefile UF.C UF.H UFIO.C UFIO.H UFServer.C UFServer.H UFStatSystem.C UFStatSystem.H ufHTTPServer.C

Added: trafficserver/traffic/branches/UserFiber/UFStatSystem.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UFStatSystem.C?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFStatSystem.C (added)
+++ trafficserver/traffic/branches/UserFiber/UFStatSystem.C Fri May  7 21:10:35 2010
@@ -0,0 +1,545 @@
+#include "UFStatSystem.H"
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
+#include <iostream>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+
+UFServer *UFStatSystem::server;
+
+std::map<std::string, uint32_t> UFStatSystem::stat_name_to_num;
+std::vector< std::pair<std::string, long long> > UFStatSystem::global_stats;
+uint32_t UFStatSystem::MAX_STATS_ALLOWED = 500000;
+uint32_t UFStatSystem::NUM_STATS_ESTIMATE = 5000;
+static UFMutex statsMutex;
+
+void UFStatSystem::incrementGlobal(uint32_t stat_num, long long stat_val)
+{
+    if(stat_num >= global_stats.size()) {
+        return;
+    }
+    global_stats[stat_num].second += stat_val;
+}
+
+bool UFStatSystem::increment(uint32_t stat_num, long long stat_val)
+{
+    if(stat_num >= MAX_STATS_ALLOWED) {
+        return false;
+    }
+  
+    // Increment stat in this thread.
+    // If resize is required, take the thread's stats_lock
+    
+    UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+
+    if(running_thread_scheduler->_stats.size() < (unsigned int)stat_num+1) {
+        running_thread_scheduler->_stats_lock.lock(running_user_fiber);
+        uint32_t stat_vec_size = ( (NUM_STATS_ESTIMATE >= stat_num+1) ? NUM_STATS_ESTIMATE : stat_num+1);
+        running_thread_scheduler->_stats.resize(stat_vec_size, 0);
+        running_thread_scheduler->_stats_lock.unlock(running_user_fiber);
+    }
+    
+    running_thread_scheduler->_stats[stat_num] += stat_val;
+    return true;
+}
+
+bool UFStatSystem::increment(const char *stat_name, long long stat_val)
+{
+    uint32_t stat_num;
+    if(!getStatNum(stat_name, stat_num)) {
+        return false;
+    }
+    return increment(stat_num, stat_val);
+}
+
+bool UFStatSystem::get(uint32_t stat_num, long long *stat_val)
+{
+    // Get stat lock
+    UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+    statsMutex.lock(running_user_fiber);
+
+    // Check if stat_num is valid
+    if(stat_num >= global_stats.size()) {
+        statsMutex.unlock(running_user_fiber);
+        return false;
+    }
+
+    // Get stat value from global map
+    *stat_val = global_stats[stat_num].second;
+    statsMutex.unlock(running_user_fiber);
+    return true;
+}
+
+bool UFStatSystem::get(const char *stat_name, long long *stat_val)
+{
+    // Get value of stat with name == stat_name
+    // Returns value from the global stats map. does not walk threads
+    uint32_t stat_num;
+    if(!getStatNum(stat_name, stat_num)) {
+        return false;
+    }
+    return get(stat_num, stat_val);
+}
+
+bool UFStatSystem::get_current(uint32_t stat_num, long long *stat_val)
+{
+    // Returns current value of stat. Walks all threads
+
+    UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+
+    *stat_val = 0;
+    // Collect stat from all threads
+
+    StringThreadMapping * all_threads = server->getThreadList();
+
+    for(std::map<std::string, std::vector<pthread_t>* >::const_iterator map_it = all_threads->begin();
+        map_it != all_threads->end();
+        map_it++) {
+        for(std::vector<pthread_t>::const_iterator thread_it = map_it->second->begin();
+            thread_it != map_it->second->end(); thread_it++) {
+            UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(*thread_it);
+            this_thread_scheduler->_stats_lock.lock(running_user_fiber);
+            if(this_thread_scheduler->_stats.size() > stat_num) { 
+                *stat_val += this_thread_scheduler->_stats[stat_num];
+            }
+            this_thread_scheduler->_stats_lock.unlock(running_user_fiber);
+        }
+    }
+    return true;
+}
+
+bool UFStatSystem::get_current(const char *stat_name, long long *stat_val)
+{
+    uint32_t stat_num;
+    if(!getStatNum(stat_name, stat_num)) {
+        return false;
+    }
+    return get_current(stat_num, stat_val);
+}
+
+bool UFStatSystem::registerStat(const char *stat_name, uint32_t *stat_num, bool lock_needed)
+{
+    if(!stat_num ) {
+        return false;
+    }
+
+    // Get stat lock
+    UFScheduler* running_thread_scheduler = NULL;
+    UF* running_user_fiber = NULL;
+    if(lock_needed) {
+        running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+        running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+        statsMutex.lock(running_user_fiber);
+    }
+
+    // Check if stat is already registered
+    std::map<std::string, uint32_t>::const_iterator stat_name_it = stat_name_to_num.find(stat_name);
+    if(stat_name_it != stat_name_to_num.end()) {
+        *stat_num = stat_name_it->second;
+        if(lock_needed)
+            statsMutex.unlock(running_user_fiber);
+        return true;
+    }
+
+    // Check to see if limit for max allowed stats was hit
+    if(global_stats.size() == MAX_STATS_ALLOWED) {
+        if(lock_needed)
+          statsMutex.unlock(running_user_fiber);
+        *stat_num = MAX_STATS_ALLOWED;
+        return false;
+    }
+
+    // Regiter new stat. Store mapping from stat_num to name
+    global_stats.push_back(std::make_pair(stat_name, 0));
+    *stat_num = global_stats.size() - 1;
+    stat_name_to_num[stat_name] = *stat_num;
+
+    // Release stat lock
+    if(lock_needed)
+        statsMutex.unlock(running_user_fiber);
+    return true;
+}
+
+void UFStatSystem::setMaxStatsAllowed(uint32_t max_stats_allowed)
+{
+    MAX_STATS_ALLOWED = max_stats_allowed;
+}
+
+void UFStatSystem::setNumStatsEstimate(uint32_t num_stats_estimate)
+{
+    if(num_stats_estimate < MAX_STATS_ALLOWED)
+        NUM_STATS_ESTIMATE = num_stats_estimate;
+    else
+        NUM_STATS_ESTIMATE = MAX_STATS_ALLOWED;
+}
+
+void UFStatSystem::setStatCommandPort(int port)
+{
+    UFStatCollector::setStatCommandPort(port);
+}
+
+void UFStatSystem::setReadTimeout(int secs, long usecs)
+{
+    UFStatCollector::setReadTimeout(secs, usecs);
+}
+
+void UFStatSystem::init(UFServer* ufs)
+{
+    // Store server pointer
+    server = ufs;
+    UFStatCollector::init(ufs);
+}
+
+void UFStatSystem::clear()
+{
+    for(std::vector< std::pair<std::string, long long> >::iterator it = UFStatSystem::global_stats.begin();
+            it != UFStatSystem::global_stats.end(); it++) {
+        it->second = 0;
+    }
+}
+
+void UFStatSystem::collect()
+{
+    UFScheduler* stat_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* stat_user_fiber = stat_thread_scheduler->getRunningFiberOnThisThread();
+    statsMutex.lock(stat_user_fiber);
+    UFStatSystem::clear();
+
+    StringThreadMapping * all_threads = server->getThreadList();
+
+    for(std::map<std::string, std::vector<pthread_t>* >::const_iterator map_it = all_threads->begin();
+        map_it != all_threads->end();
+        map_it++) {
+        for(std::vector<pthread_t>::const_iterator thread_it = map_it->second->begin();
+            thread_it != map_it->second->end(); thread_it++) {
+            UFScheduler* this_thread_scheduler = UFScheduler::getUFScheduler(*thread_it);
+            // Lock thread stats to prevent resizing on increment
+            this_thread_scheduler->_stats_lock.lock(stat_user_fiber);
+            int i = 0;
+            for(std::vector<long long>::iterator it = this_thread_scheduler->_stats.begin();
+                it != this_thread_scheduler->_stats.end(); it++, i++) {
+                if(*it != 0) {
+                    incrementGlobal(i, *it);
+                }
+            }
+            // Release thread stats lock
+            this_thread_scheduler->_stats_lock.unlock(stat_user_fiber);
+        }
+    }
+    statsMutex.unlock(stat_user_fiber);
+}
+
+bool UFStatSystem::getStatNum(const char *stat_name, uint32_t &stat_num)
+{
+    UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+    statsMutex.lock(running_user_fiber);
+
+    // Get stat num and release lock
+    std::map<std::string, uint32_t>::const_iterator stat_name_it = stat_name_to_num.find(stat_name);
+    if(stat_name_it == stat_name_to_num.end()) {
+        statsMutex.unlock(running_user_fiber);
+        return false;
+    }
+
+    stat_num = stat_name_it->second;
+    statsMutex.unlock(running_user_fiber);
+    return true;
+}
+
+struct StatThreadChooser : public UFIOAcceptThreadChooser
+{
+    static pair<UFScheduler*, pthread_t> _accept_thread;
+    pair<UFScheduler*, pthread_t> pickThread(int listeningFd)
+    {
+        return _accept_thread;
+    }
+};
+pair<UFScheduler*, pthread_t> StatThreadChooser::_accept_thread;
+
+static int MAX_STAT_NAME_LENGTH = 512;
+int UFStatCollector::_statCommandPort = 8091;
+time_t UFStatCollector::_startTime = time(NULL);
+int UFStatCollector::_readTimeout = 600;
+long UFStatCollector::_readTimeoutUSecs = 0;
+
+void UFStatCollector::setStatCommandPort(int port)
+{
+    _statCommandPort = port;
+}
+
+void UFStatCollector::setReadTimeout(int secs, long usecs)
+{
+    _readTimeout = secs;
+    _readTimeoutUSecs = usecs;
+}
+
+void UFStatCollector::init(UFServer* ufs)
+{
+    pthread_t stat_thread;
+    pthread_create(&stat_thread, NULL, scheduler, ufs);
+}
+
+int CollectorRunner::_myLoc = -1;
+CollectorRunner* CollectorRunner::_self = new CollectorRunner(true);
+void CollectorRunner::run()
+{
+    UF* uf = UFScheduler::getUF();
+    UFScheduler* ufs = uf->getParentScheduler();
+
+    while(1) 
+    {
+        uf->usleep(60000000); //60 secs
+        UFStatSystem::collect();
+    }
+    ufs->setExit();
+}
+
+
+//---------------------------------------------------------------------
+// Handles a command port client connection
+struct StatCommandProcessing : public UF
+{
+    void run();
+    StatCommandProcessing(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    UF* createUF() { return new StatCommandProcessing(); }
+    static StatCommandProcessing* _self;
+    static int _myLoc;
+};
+int StatCommandProcessing::_myLoc = -1;
+StatCommandProcessing* StatCommandProcessing::_self = new StatCommandProcessing(true);
+void StatCommandProcessing::run()
+{
+    if (!_startingArgs)
+        return;
+    
+    //1. create the new UFIO from the new fd
+    UFIOAcceptArgs* fiberStartingArgs = (UFIOAcceptArgs*) _startingArgs;
+    UFIO* ufio = fiberStartingArgs->ufio;
+    
+    static const char cmdUnrec[] = "Unrecognized command.\r\n";
+    static const char cmdHelp[] = "Valid commands are: \r\n"
+        "  stats - Print stats which have been collected.\r\n"
+        "  stats_current - Print stats after forcing a collect\r\n"
+        "  stat (<stat_name> )* - Print values for stats that are specified. Does not collect\r\n"
+        "  stat_current (<stat_name> )* - Print values for stats that are specified after collecting from all threads\r\n"
+        "  help - Prints this message.\r\n"
+        "  quit - Close this connection.\r\n"
+        ;
+    int readbytes;
+    char readbuf[1024];
+    std::string readData;
+    
+    while(1) {
+        if((readbytes = ufio->read(readbuf, 1023, 0)) <= 0)
+            break;
+
+        readData.append(readbuf, readbytes);
+
+        if(readData.find("\r\n") == string::npos)
+           continue;
+           
+        if(readData.find("stats_current") != string::npos) {
+            // Force a collect before printing out the stats
+            UFStatSystem::collect();
+            std::stringstream printbuf;
+            UFStatCollector::printStats(printbuf);
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+                //failed write, break to close connection
+                break;
+            }
+        }
+        else if (readData.find("stats") != string::npos) {
+            std::stringstream printbuf;
+            UFStatCollector::printStats(printbuf);
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+                    //failed write, break to close connection
+                break;
+            }
+        }
+        else if (readData.find("stat ") != string::npos || readData.find("stat_current ") != string::npos) {
+            std::vector<std::string> stats;
+            char stat_name[MAX_STAT_NAME_LENGTH];
+            bzero(stat_name, MAX_STAT_NAME_LENGTH);
+            int next;
+            char *start = (char *)readData.c_str();
+
+            // determine if collection has to be forced or not
+            bool get_current = false;
+            if(readData.find("stat ") != string::npos)
+                start += strlen("stat ");
+            else {
+                start += strlen("stat_current ");
+                get_current = true;
+            }
+            
+            while(sscanf(start, "%s%n", stat_name, &next) == 1) {
+                // Prefix support
+                char *prefix_end = strchr(start, '*');
+                if(prefix_end != NULL) {
+                    std::string prefix;
+                    prefix.assign(start, prefix_end-start);
+                    // Get all stats with the prefix
+                    UFStatCollector::getStatsWithPrefix(prefix, stats);
+                }
+                else {
+                    stats.push_back(stat_name);
+                }
+                bzero(stat_name, MAX_STAT_NAME_LENGTH);
+                start+=next;
+            }
+            std::stringstream printbuf;
+            
+            UFStatCollector::printStats(stats, printbuf, get_current);
+            if (ufio->write(printbuf.str().data(), printbuf.str().length()) == -1) {
+                //failed write, break to close connection
+                break;
+            }
+        }
+        else if (readData.find("help") != string::npos) {
+            if (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1) {
+                //failed write, break to close connection
+                break;
+            }
+        }
+        else if (readData.find("quit") != string::npos) {
+            break;
+        }
+        else {
+            if ((ufio->write(cmdUnrec, sizeof(cmdUnrec)-1) == -1) ||
+                (ufio->write(cmdHelp, sizeof(cmdHelp)-1) == -1)) {
+                //failed write, break to close connection
+                break;
+            }
+        }
+        readData.clear();
+    } // END while loop
+    
+    delete ufio;
+    delete fiberStartingArgs;
+}
+
+//----------------------------------------------------------------------
+// Creates a socket for command port and listens for connection requests
+int StatCommandListenerRun::_myLoc = -1;
+StatCommandListenerRun* StatCommandListenerRun::_self = new StatCommandListenerRun(true);
+void StatCommandListenerRun::run()
+{
+    int fd = UFIO::setupConnectionToAccept(0, UFStatCollector::_statCommandPort, 16000);
+    if(fd < 0)
+    {
+        cerr<<"couldnt setup accept thread for stat port "<<strerror(errno)<<endl;
+        return;
+    }
+
+    UFIO* ufio = new UFIO(UFScheduler::getUF());
+    if(!ufio)
+    {
+        cerr<<"couldnt setup accept thread"<<endl;
+        ::close(fd);
+        return;
+    }
+    ufio->setFd(fd, false);
+
+    StatThreadChooser ufiotChooser;
+    ufio->accept(&ufiotChooser, StatCommandProcessing::_myLoc, 0, 0, 0);
+}
+
+void* UFStatCollector::scheduler(void *args)
+{
+    if(!args)
+        return 0;
+
+    // add jobs to scheduler
+    UFScheduler ufs;
+
+    // stat collector
+    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(CollectorRunner::_myLoc)->createUF());
+
+    // set thread for stat command listener to run on
+    StatThreadChooser::_accept_thread = make_pair(&ufs, pthread_self());
+    
+    // io scheduler
+    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(IORunner::_myLoc)->createUF());
+    
+    // stat command listener
+    ufs.addFiberToScheduler(UFFactory::getInstance()->selectUF(StatCommandListenerRun::_myLoc)->createUF());
+    ((UFServer*) args)->addThread("STAT_COLLECTOR", &ufs);
+    ufs.runScheduler();
+    return 0;
+}
+
+//----------------------------------------------------------
+void UFStatCollector::printStats(std::stringstream &printbuf) {
+   printbuf <<  "Cache stats: \n"
+                "-----------------------------------------------------------------------------\n";
+   
+  printbuf << "TIME " << _startTime <<"\n";
+
+  UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+  UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+  statsMutex.lock(running_user_fiber);
+  
+  for(std::vector< std::pair<std::string, long long> >::const_iterator it = UFStatSystem::global_stats.begin();
+      it != UFStatSystem::global_stats.end(); it++) {
+      if(it->second != 0 ) {
+          printbuf << "STAT " << it->first << " " << it->second << "\n";
+      }
+  }
+  statsMutex.unlock(running_user_fiber);
+
+  printbuf << "END\n";
+}
+
+void UFStatCollector::printStat(const char *stat_name, std::stringstream &printbuf, bool current) {
+    // Print only non zero stats
+    long long stat_val = 0;
+    bool stat_get_status;
+    if(current) {
+        stat_get_status = UFStatSystem::get_current(stat_name, &stat_val);
+    }
+    else {
+        stat_get_status = UFStatSystem::get(stat_name, &stat_val);
+    }
+    
+    if(stat_get_status && stat_val != 0) {
+        printbuf << "STAT " << stat_name << " " << stat_val << "\n";
+    }
+}
+
+void UFStatCollector::printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current) {
+  printbuf << "TIME " << _startTime <<"\n";
+    for(std::vector<std::string>::const_iterator it = stat_names.begin();
+        it != stat_names.end();
+        it++) {
+        printStat(it->c_str(), printbuf, current);
+    }
+   printbuf << "END\n";
+}
+
+void
+UFStatCollector::getStatsWithPrefix(const std::string &stat_prefix, std::vector<std::string> &stat_names)
+{
+    UFScheduler* running_thread_scheduler = UFScheduler::getUFScheduler(pthread_self());
+    UF* running_user_fiber = running_thread_scheduler->getRunningFiberOnThisThread();
+    statsMutex.lock(running_user_fiber);
+    // Get all stats which start with stat_prefix
+    for(std::vector< std::pair<std::string, long long> >::const_iterator it = UFStatSystem::global_stats.begin();
+        it != UFStatSystem::global_stats.end(); it++) {
+        size_t found = it->first.find(stat_prefix);
+        if(found == 0) {
+            stat_names.push_back(it->first);
+        }
+    }
+    statsMutex.unlock(running_user_fiber);
+}

Added: trafficserver/traffic/branches/UserFiber/UFStatSystem.H
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/UFStatSystem.H?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/UFStatSystem.H (added)
+++ trafficserver/traffic/branches/UserFiber/UFStatSystem.H Fri May  7 21:10:35 2010
@@ -0,0 +1,108 @@
+/******************* _*_ Mode: C++; Indent: Inktomi4 _*_ *******************
+ Stats
+
+ ***************************************************************************/
+
+#ifndef UF_STAT_SYSTEM_H
+#define UF_STAT_SYSTEM_H
+
+#include <map>
+#include <string>
+#include <vector>
+#include <utility>
+#include <sstream>
+#include "UF.H"
+
+class UFIO;
+class UFServer;
+class UFStatSystem
+{
+public:
+    friend class CollectorRunner;
+    friend class StatCommandProcessing;
+
+    static bool increment(uint32_t stat_num, long long stat_val = 1);
+    static bool increment(const char *stat_name, long long stat_val = 1);
+    static bool get(uint32_t stat_num, long long *stat_val);
+    static bool get(const char *stat_name, long long *stat_val);
+    static bool get_current(uint32_t stat_num, long long *stat_val);
+    static bool get_current(const char *stat_name, long long *stat_val);
+    
+    static bool registerStat(const char *stat_name, uint32_t *stat_num, bool lock_needed = true);
+    static void setMaxStatsAllowed(uint32_t max_stats_allowed);
+    static void setNumStatsEstimate(uint32_t num_stats_estimate);
+    static void setStatCommandPort(int port);
+    static void setReadTimeout(int secs = 1, long usecs = 0);
+    static void init(UFServer* ufs);
+    
+private:
+    // These must be called after acquiring the global stats lock
+    // Since these are private, only methods in UFStatCollectorContinuation can call them
+    static void incrementGlobal(uint32_t stat_num, long long stat_val = 1);
+    static void clear();
+    static void collect();
+
+    static bool getStatNum(const char *stat_name, uint32_t &stat_num);
+    static UFServer *server;
+    static std::map<std::string, uint32_t> stat_name_to_num;
+    static std::vector< std::pair<std::string, long long> > global_stats;
+    static uint32_t MAX_STATS_ALLOWED;
+    static uint32_t NUM_STATS_ESTIMATE;
+
+    friend class UFStatCollector;
+};
+
+struct CollectorRunner : public UF
+{
+    void run();
+    CollectorRunner(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    UF* createUF() { return new CollectorRunner(); }
+    static CollectorRunner* _self;
+    static int _myLoc;
+};
+
+struct StatCommandListenerRun;
+class UFStatCollector
+{
+public:
+    friend class StatCommandListenerRun;
+    friend class StatCommandProcessing;
+
+    static void setStatCommandPort(int port);
+    static void setReadTimeout(int secs = 1, long usecs = 0);
+    static void init(UFServer* ufs);
+    
+private:
+    static void *scheduler(void *args);
+    static void *commandLoop(void *data);
+    static void printStats(std::stringstream &printbuf);
+    static void printStat(const char *stat_name, std::stringstream &printbuf, bool current = false);
+    static void printStats(const std::vector<std::string> &stat_names, std::stringstream &printbuf, bool current = false);
+    static void getStatsWithPrefix(const std::string &stat_prefix, std::vector<std::string> &stat_names);
+
+    // member variables
+    static int _statCommandPort;
+    static time_t _startTime;
+    static int _readTimeout;
+    static long _readTimeoutUSecs;
+};
+
+struct StatCommandListenerRun : public UF
+{
+    void run();
+    StatCommandListenerRun(bool registerMe = false)
+    {
+        if(registerMe)
+            _myLoc = UFFactory::getInstance()->registerFunc((UF*)this);
+    }
+    UF* createUF() { return new StatCommandListenerRun(); }
+    static StatCommandListenerRun* _self;
+    static int _myLoc;
+};
+
+#endif // UF_STAT_SYSTEM
+

Added: trafficserver/traffic/branches/UserFiber/ufHTTPServer.C
URL: http://svn.apache.org/viewvc/trafficserver/traffic/branches/UserFiber/ufHTTPServer.C?rev=942228&view=auto
==============================================================================
--- trafficserver/traffic/branches/UserFiber/ufHTTPServer.C (added)
+++ trafficserver/traffic/branches/UserFiber/ufHTTPServer.C Fri May  7 21:10:35 2010
@@ -0,0 +1,112 @@
+#include <iostream>
+#include <errno.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <vector>
+#include <string.h>
+
+#include "UF.H"
+#include "UFIO.H"
+#include "UFServer.H"
+#include "UFStatSystem.H"
+
+using namespace std;
+
+static uint32_t http_request;
+struct HTTPServer : public UFServer
+{
+    HTTPServer(char* interfaceIP, unsigned int port)
+    {
+        _addressToBindTo = interfaceIP ? interfaceIP : "";
+        _port = port;
+    }
+    void handleNewConnection(UFIO* ufio);
+    void preAccept() { UFStatSystem::registerStat("http_request", &http_request, false); }
+};
+
+const char* HTTP_ANSWER = "HTTP/1.0 200 OK\r\nConnection: keep-alive\r\nCache-Control: private, max-age=0\r\nContent-Type: text/html; charset=ISO-8859-1\r\nContent-Length: 5\r\n\r\nhello";
+const unsigned int HTTP_ANSWER_LENGTH = strlen(HTTP_ANSWER);
+unsigned int counter = 0;
+unsigned long long int readTimeout = 0;
+void HTTPServer::handleNewConnection(UFIO* ufio)
+{
+    UFStatSystem::increment(http_request);
+    if(!ufio)
+    {
+        return;
+    }
+
+    char buf[256];
+    int amtRead = 0;
+    int amtWritten = 0;
+    bool bail = false;
+    size_t n = 0;
+    string readData;
+    while(!bail)
+    {
+        //3. setup for read
+        amtRead = ufio->read(buf, 255, readTimeout);
+        if(amtRead > 0)
+        {
+            readData.append(buf, amtRead);
+            if(readData.find("\r\n\r\n") == string::npos)
+                continue;
+
+            //4. write what we've read
+            int amtToWrite = HTTP_ANSWER_LENGTH; //amtRead
+            char* bufToWrite = const_cast<char*>(HTTP_ANSWER); //buf
+            amtWritten = 0;
+            while(1)
+            {
+                n = ufio->write(bufToWrite+amtWritten, amtToWrite-amtWritten);
+                if(n <= 0)
+                {
+                    if(errno == EINTR)
+                        continue;
+                    bail = true;
+                    break;
+                }
+                amtWritten += n;
+                if(amtWritten == amtToWrite)
+                    break;
+                else if(amtWritten > amtToWrite)
+                {
+                    bail = true;
+                    break;
+                }
+            }
+
+            readData.clear();
+        }
+        else if(amtRead <= 0)
+            break;
+    }
+}
+
+
+int main(int argc, char** argv)
+{
+    unsigned int numThreads = 8;
+    unsigned int numProcesses = 1;
+    unsigned short int port = 8080;
+    if(argc > 1)
+        numThreads = atoi(argv[1]);
+    if(argc > 2)
+        port = atoi(argv[2]);
+    if(argc > 3)
+        readTimeout = atoi(argv[3]);
+    if(argc > 4)
+        numProcesses = atoi(argv[4]);
+
+    cerr<<"setting readtimeout = "<<readTimeout<<endl;
+
+    HTTPServer ufhttp(0, port);
+    ufhttp.MAX_ACCEPT_THREADS_ALLOWED   = 1;
+    ufhttp.MAX_THREADS_ALLOWED          = numThreads;
+    ufhttp.MAX_PROCESSES_ALLOWED        = numProcesses;
+    ufhttp.UF_STACK_SIZE                = 8192;
+
+    ufhttp.run();
+
+    return 0;
+}