You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ji...@apache.org on 2014/05/01 13:43:45 UTC
svn commit: r1591622 [14/33] - in /httpd/mod_spdy/trunk: ./ base/
base/base.xcodeproj/ base/metrics/ build/ build/all.xcodeproj/
build/build_util.xcodeproj/ build/install.xcodeproj/ build/internal/
build/linux/ build/mac/ build/util/ build/win/ install...
Added: httpd/mod_spdy/trunk/mod_spdy/common/testing/spdy_frame_matchers.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/testing/spdy_frame_matchers.h?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/testing/spdy_frame_matchers.h (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/testing/spdy_frame_matchers.h Thu May 1 11:43:36 2014
@@ -0,0 +1,285 @@
+// Copyright 2012 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_
+#define MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_
+
+#include <iostream>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/string_piece.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+
+namespace mod_spdy {
+
+namespace testing {
+
+class IsControlFrameOfTypeMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ explicit IsControlFrameOfTypeMatcher(net::SpdyControlType type)
+ : type_(type) {}
+ virtual ~IsControlFrameOfTypeMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const net::SpdyControlType type_;
+ DISALLOW_COPY_AND_ASSIGN(IsControlFrameOfTypeMatcher);
+};
+
+// Make a matcher that requires the argument to be a control frame of the given
+// type.
+inline ::testing::Matcher<const net::SpdyFrame&> IsControlFrameOfType(
+ net::SpdyControlType type) {
+ return ::testing::MakeMatcher(new IsControlFrameOfTypeMatcher(type));
+}
+
+class IsDataFrameMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ IsDataFrameMatcher() {}
+ virtual ~IsDataFrameMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ DISALLOW_COPY_AND_ASSIGN(IsDataFrameMatcher);
+};
+
+// Make a matcher that requires the argument to be a DATA frame.
+inline ::testing::Matcher<const net::SpdyFrame&> IsDataFrame() {
+ return ::testing::MakeMatcher(new IsDataFrameMatcher);
+}
+
+class IsDataFrameWithMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ explicit IsDataFrameWithMatcher(base::StringPiece payload)
+ : payload_(payload.as_string()) {}
+ virtual ~IsDataFrameWithMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const std::string payload_;
+ DISALLOW_COPY_AND_ASSIGN(IsDataFrameWithMatcher);
+};
+
+// Make a matcher that requires the argument to be a DATA frame with the given
+// data payload.
+inline ::testing::Matcher<const net::SpdyFrame&> IsDataFrameWith(
+ base::StringPiece payload) {
+ return ::testing::MakeMatcher(new IsDataFrameWithMatcher(payload));
+}
+
+class IsGoAwayMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ explicit IsGoAwayMatcher(net::SpdyGoAwayStatus status) : status_(status) {}
+ virtual ~IsGoAwayMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const net::SpdyGoAwayStatus status_;
+ DISALLOW_COPY_AND_ASSIGN(IsGoAwayMatcher);
+};
+
+// Make a matcher that requires the argument to be a GOAWAY frame with the
+// given status code.
+inline ::testing::Matcher<const net::SpdyFrame&> IsGoAway(
+ net::SpdyGoAwayStatus status) {
+ return ::testing::MakeMatcher(new IsGoAwayMatcher(status));
+}
+
+class IsRstStreamMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ explicit IsRstStreamMatcher(net::SpdyStatusCodes status) : status_(status) {}
+ virtual ~IsRstStreamMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const net::SpdyStatusCodes status_;
+ DISALLOW_COPY_AND_ASSIGN(IsRstStreamMatcher);
+};
+
+// Make a matcher that requires the argument to be a RST_STREAM frame with the
+// given status code.
+inline ::testing::Matcher<const net::SpdyFrame&> IsRstStream(
+ net::SpdyStatusCodes status) {
+ return ::testing::MakeMatcher(new IsRstStreamMatcher(status));
+}
+
+class IsWindowUpdateMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ explicit IsWindowUpdateMatcher(uint32 delta) : delta_(delta) {}
+ virtual ~IsWindowUpdateMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const uint32 delta_;
+ DISALLOW_COPY_AND_ASSIGN(IsWindowUpdateMatcher);
+};
+
+// Make a matcher that requires the argument to be a WINDOW_UPDATE frame with
+// the given window-size-delta.
+inline ::testing::Matcher<const net::SpdyFrame&> IsWindowUpdate(uint32 delta) {
+ return ::testing::MakeMatcher(new IsWindowUpdateMatcher(delta));
+}
+
+class FlagFinIsMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ FlagFinIsMatcher(bool fin) : fin_(fin) {}
+ virtual ~FlagFinIsMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const bool fin_;
+ DISALLOW_COPY_AND_ASSIGN(FlagFinIsMatcher);
+};
+
+// Make a matcher that requires the frame to have the given FLAG_FIN value.
+inline ::testing::Matcher<const net::SpdyFrame&> FlagFinIs(bool fin) {
+ return ::testing::MakeMatcher(new FlagFinIsMatcher(fin));
+}
+
+class FlagUnidirectionalIsMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ FlagUnidirectionalIsMatcher(bool unidirectional)
+ : unidirectional_(unidirectional) {}
+ virtual ~FlagUnidirectionalIsMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const bool unidirectional_;
+ DISALLOW_COPY_AND_ASSIGN(FlagUnidirectionalIsMatcher);
+};
+
+// Make a matcher that requires the frame to have the given FLAG_UNIDIRECTIONAL
+// value.
+inline ::testing::Matcher<const net::SpdyFrame&> FlagUnidirectionalIs(
+ bool unidirectional) {
+ return ::testing::MakeMatcher(
+ new FlagUnidirectionalIsMatcher(unidirectional));
+}
+
+class StreamIdIsMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ StreamIdIsMatcher(net::SpdyStreamId stream_id) : stream_id_(stream_id) {}
+ virtual ~StreamIdIsMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const net::SpdyStreamId stream_id_;
+ DISALLOW_COPY_AND_ASSIGN(StreamIdIsMatcher);
+};
+
+// Make a matcher that requires the frame to have the given stream ID.
+inline ::testing::Matcher<const net::SpdyFrame&> StreamIdIs(
+ net::SpdyStreamId stream_id) {
+ return ::testing::MakeMatcher(new StreamIdIsMatcher(stream_id));
+}
+
+class AssociatedStreamIdIsMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ AssociatedStreamIdIsMatcher(net::SpdyStreamId stream_id)
+ : associated_stream_id_(stream_id) {}
+ virtual ~AssociatedStreamIdIsMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const net::SpdyStreamId associated_stream_id_;
+ DISALLOW_COPY_AND_ASSIGN(AssociatedStreamIdIsMatcher);
+};
+
+// Make a matcher that requires the frame to have the given associated stream
+// ID.
+inline ::testing::Matcher<const net::SpdyFrame&> AssociatedStreamIdIs(
+ net::SpdyStreamId stream_id) {
+ return ::testing::MakeMatcher(new AssociatedStreamIdIsMatcher(stream_id));
+}
+
+class PriorityIsMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ PriorityIsMatcher(net::SpdyPriority priority) : priority_(priority) {}
+ virtual ~PriorityIsMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const net::SpdyPriority priority_;
+ DISALLOW_COPY_AND_ASSIGN(PriorityIsMatcher);
+};
+
+// Make a matcher that requires the frame to have the given priority.
+inline ::testing::Matcher<const net::SpdyFrame&> PriorityIs(
+ net::SpdyPriority priority) {
+ return ::testing::MakeMatcher(new PriorityIsMatcher(priority));
+}
+
+class UncompressedHeadersAreMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrame&> {
+ public:
+ UncompressedHeadersAreMatcher(const net::SpdyHeaderBlock& headers)
+ : headers_(headers) {}
+ virtual ~UncompressedHeadersAreMatcher() {}
+ virtual bool MatchAndExplain(const net::SpdyFrame& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+ private:
+ const net::SpdyHeaderBlock headers_;
+ DISALLOW_COPY_AND_ASSIGN(UncompressedHeadersAreMatcher);
+};
+
+// Make a matcher that requires the frame to be uncompressed and have exactly
+// the given headers.
+inline ::testing::Matcher<const net::SpdyFrame&> UncompressedHeadersAre(
+ const net::SpdyHeaderBlock& headers) {
+ return ::testing::MakeMatcher(new UncompressedHeadersAreMatcher(headers));
+}
+
+} // namespace testing
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_
Propchange: httpd/mod_spdy/trunk/mod_spdy/common/testing/spdy_frame_matchers.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.cc?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.cc (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.cc Thu May 1 11:43:36 2014
@@ -0,0 +1,490 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/thread_pool.h"
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/platform_thread.h"
+#include "base/time.h"
+#include "mod_spdy/common/executor.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace {
+
+// Shut down a worker thread after it has been idle for this many seconds:
+const int64 kDefaultMaxWorkerIdleSeconds = 60;
+
+} // namespace
+
+namespace mod_spdy {
+
+// An executor that uses the ThreadPool to execute tasks. Returned by
+// ThreadPool::NewExecutor.
+class ThreadPool::ThreadPoolExecutor : public Executor {
+ public:
+ explicit ThreadPoolExecutor(ThreadPool* master)
+ : master_(master),
+ stopping_condvar_(&master_->lock_),
+ stopped_(false) {}
+ virtual ~ThreadPoolExecutor() { Stop(); }
+
+ // Executor methods:
+ virtual void AddTask(net_instaweb::Function* task,
+ net::SpdyPriority priority);
+ virtual void Stop();
+
+ private:
+ friend class ThreadPool;
+ ThreadPool* const master_;
+ base::ConditionVariable stopping_condvar_;
+ bool stopped_; // protected by master_->lock_
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPoolExecutor);
+};
+
+// Add a task to the executor; if the executor has already been stopped, just
+// cancel the task immediately.
+void ThreadPool::ThreadPoolExecutor::AddTask(net_instaweb::Function* task,
+ net::SpdyPriority priority) {
+ {
+ base::AutoLock autolock(master_->lock_);
+
+ // Clean up any zombie WorkerThreads in the ThreadPool that are waiting for
+ // reaping. If the OS process we're in accumulates too many unjoined
+ // zombie threads over time, the OS might not be able to spawn a new thread
+ // below. So right now is a good time to clean them up.
+ if (!master_->zombies_.empty()) {
+ std::set<WorkerThread*> zombies;
+ zombies.swap(master_->zombies_);
+ // Joining these threads should be basically instant, since they've
+ // already terminated. But to be safe, let's unlock while we join them.
+ base::AutoUnlock autounlock(master_->lock_);
+ ThreadPool::JoinThreads(zombies);
+ }
+
+ // The thread pool shouldn't be shutting down until all executors are
+ // destroyed. Since this executor clearly still exists, the thread pool
+ // must still be open.
+ DCHECK(!master_->shutting_down_);
+
+ // If the executor hasn't been stopped, add the task to the queue and
+ // notify a worker that there's a new task ready to be taken.
+ if (!stopped_) {
+ master_->task_queue_.insert(std::make_pair(priority, Task(task, this)));
+ master_->worker_condvar_.Signal();
+ master_->StartNewWorkerIfNeeded();
+ return;
+ }
+ }
+
+ // If this executor has already been stopped, just cancel the task (after
+ // releasing the lock).
+ task->CallCancel();
+}
+
+// Stop the executor. Cancel all pending tasks in the thread pool owned by
+// this executor, and then block until all active tasks owned by this executor
+// complete. Stopping the executor more than once has no effect.
+void ThreadPool::ThreadPoolExecutor::Stop() {
+ std::vector<net_instaweb::Function*> functions_to_cancel;
+ {
+ base::AutoLock autolock(master_->lock_);
+ if (stopped_) {
+ return;
+ }
+ stopped_ = true;
+
+ // Remove all tasks owned by this executor from the queue, and collect up
+ // the function objects to be cancelled.
+ TaskQueue::iterator next_iter = master_->task_queue_.begin();
+ while (next_iter != master_->task_queue_.end()) {
+ TaskQueue::iterator iter = next_iter;
+ const Task& task = iter->second;
+ ++next_iter; // Increment next_iter _before_ we might erase iter.
+ if (task.owner == this) {
+ functions_to_cancel.push_back(task.function);
+ master_->task_queue_.erase(iter);
+ }
+ }
+ }
+
+ // Unlock while we cancel the functions, so we're not hogging the lock for
+ // too long, and to avoid potential deadlock if the cancel method tries to do
+ // anything with the thread pool.
+ for (std::vector<net_instaweb::Function*>::const_iterator iter =
+ functions_to_cancel.begin();
+ iter != functions_to_cancel.end(); ++iter) {
+ (*iter)->CallCancel();
+ }
+ // CallCancel deletes the Function objects, invalidating the pointers in this
+ // list, so let's go ahead and clear it (which also saves a little memory
+ // while we're blocked below).
+ functions_to_cancel.clear();
+
+ // Block until all our active tasks are completed.
+ {
+ base::AutoLock autolock(master_->lock_);
+ while (master_->active_task_counts_.count(this) > 0) {
+ stopping_condvar_.Wait();
+ }
+ }
+}
+
+// A WorkerThread object wraps a platform-specific thread handle, and provides
+// the method run by that thread (ThreadMain).
+class ThreadPool::WorkerThread : public base::PlatformThread::Delegate {
+ public:
+ explicit WorkerThread(ThreadPool* master);
+ virtual ~WorkerThread();
+
+ // Start the thread running. Return false on failure. If this succeeds,
+ // then you must call Join() before deleting this object.
+ bool Start();
+
+ // Block until the thread completes. You must set master_->shutting_down_ to
+ // true before calling this method, or the thread will never terminate.
+ // You shouldn't be holding master_->lock_ when calling this.
+ void Join();
+
+ // base::PlatformThread::Delegate method:
+ virtual void ThreadMain();
+
+ private:
+ enum ThreadState { NOT_STARTED, STARTED, JOINED };
+
+ ThreadPool* const master_;
+ // If two master threads are sharing the same ThreadPool, then Start() and
+ // Join() might get called by different threads. So to be safe we use a lock
+ // to protect the two below fields.
+ base::Lock thread_lock_;
+ ThreadState state_;
+ base::PlatformThreadHandle thread_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(WorkerThread);
+};
+
+ThreadPool::WorkerThread::WorkerThread(ThreadPool* master)
+ : master_(master), state_(NOT_STARTED), thread_id_() {}
+
+ThreadPool::WorkerThread::~WorkerThread() {
+ base::AutoLock autolock(thread_lock_);
+ // If we started the thread, we _must_ join it before deleting this object,
+ // or else the thread won't get cleaned up by the OS.
+ DCHECK(state_ == NOT_STARTED || state_ == JOINED);
+}
+
+bool ThreadPool::WorkerThread::Start() {
+ base::AutoLock autolock(thread_lock_);
+ DCHECK_EQ(NOT_STARTED, state_);
+ if (base::PlatformThread::Create(0, this, &thread_id_)) {
+ state_ = STARTED;
+ return true;
+ }
+ return false;
+}
+
+void ThreadPool::WorkerThread::Join() {
+ base::AutoLock autolock(thread_lock_);
+ DCHECK_EQ(STARTED, state_);
+ base::PlatformThread::Join(thread_id_);
+ state_ = JOINED;
+}
+
+// This is the code executed by the thread; when this method returns, the
+// thread will terminate.
+void ThreadPool::WorkerThread::ThreadMain() {
+ // We start by grabbing the master lock, but we release it below whenever we
+ // are 1) waiting for a new task or 2) executing a task. So in fact most of
+ // the time we are not holding the lock.
+ base::AutoLock autolock(master_->lock_);
+ while (true) {
+ // Wait until there's a task available (or we're shutting down), but don't
+ // stay idle for more than kMaxWorkerIdleSeconds seconds.
+ base::TimeDelta time_remaining = master_->max_thread_idle_time_;
+ while (!master_->shutting_down_ && master_->task_queue_.empty() &&
+ time_remaining.InSecondsF() > 0.0) {
+ // Note that TimedWait can wake up spuriously before the time runs out,
+ // so we need to measure how long we actually waited for.
+ const base::Time start = base::Time::Now();
+ master_->worker_condvar_.TimedWait(time_remaining);
+ const base::Time end = base::Time::Now();
+ // Note that the system clock can go backwards if it is reset, so make
+ // sure we never _increase_ time_remaining.
+ if (end > start) {
+ time_remaining -= end - start;
+ }
+ }
+
+ // If the thread pool is shutting down, terminate this thread; the master
+ // is about to join/delete us (in its destructor).
+ if (master_->shutting_down_) {
+ return;
+ }
+
+ // If we ran out of time without getting a task, maybe this thread should
+ // shut itself down.
+ if (master_->task_queue_.empty()) {
+ DCHECK_LE(time_remaining.InSecondsF(), 0.0);
+ // Ask the master if we should stop. If this returns true, this worker
+ // has been zombified, so we're free to terminate the thread.
+ if (master_->TryZombifyIdleThread(this)) {
+ return; // Yes, we should stop; terminate the thread.
+ } else {
+ continue; // No, we shouldn't stop; jump to the top of the while loop.
+ }
+ }
+
+ // Otherwise, there must be at least one task available now. Grab one from
+ // the master, who will then treat us as busy until we complete it.
+ const Task task = master_->GetNextTask();
+ // Release the lock while we execute the task. Note that we use AutoUnlock
+ // here rather than one AutoLock for the above code and another for the
+ // below code, so that we don't have to release and reacquire the lock at
+ // the edge of the while-loop.
+ {
+ base::AutoUnlock autounlock(master_->lock_);
+ task.function->CallRun();
+ }
+ // Inform the master we have completed the task and are no longer busy.
+ master_->OnTaskComplete(task);
+ }
+}
+
+ThreadPool::ThreadPool(int min_threads, int max_threads)
+ : min_threads_(min_threads),
+ max_threads_(max_threads),
+ max_thread_idle_time_(
+ base::TimeDelta::FromSeconds(kDefaultMaxWorkerIdleSeconds)),
+ worker_condvar_(&lock_),
+ num_busy_workers_(0),
+ shutting_down_(false) {
+ DCHECK_GE(max_thread_idle_time_.InSecondsF(), 0.0);
+ // Note that we check e.g. min_threads rather than min_threads_ (which is
+ // unsigned), in order to catch negative numbers.
+ DCHECK_GE(min_threads, 1);
+ DCHECK_GE(max_threads, 1);
+ DCHECK_LE(min_threads_, max_threads_);
+}
+
+ThreadPool::ThreadPool(int min_threads, int max_threads,
+ base::TimeDelta max_thread_idle_time)
+ : min_threads_(min_threads),
+ max_threads_(max_threads),
+ max_thread_idle_time_(max_thread_idle_time),
+ worker_condvar_(&lock_),
+ num_busy_workers_(0),
+ shutting_down_(false) {
+ DCHECK_GE(max_thread_idle_time_.InSecondsF(), 0.0);
+ DCHECK_GE(min_threads, 1);
+ DCHECK_GE(max_threads, 1);
+ DCHECK_LE(min_threads_, max_threads_);
+}
+
+ThreadPool::~ThreadPool() {
+ base::AutoLock autolock(lock_);
+
+ // If we're doing things right, all the Executors should have been
+ // destroyed before the ThreadPool is destroyed, so there should be no
+ // pending or active tasks.
+ DCHECK(task_queue_.empty());
+ DCHECK(active_task_counts_.empty());
+
+ // Wake up all the worker threads and tell them to shut down.
+ shutting_down_ = true;
+ worker_condvar_.Broadcast();
+
+ // Clean up all our threads.
+ std::set<WorkerThread*> threads;
+ zombies_.swap(threads);
+ threads.insert(workers_.begin(), workers_.end());
+ workers_.clear();
+ {
+ base::AutoUnlock autounlock(lock_);
+ JoinThreads(threads);
+ }
+
+ // Because we had shutting_down_ set to true, nothing should have been added
+ // to our WorkerThread sets while we were unlocked. So we should be all
+ // cleaned up now.
+ DCHECK(workers_.empty());
+ DCHECK(zombies_.empty());
+ DCHECK(task_queue_.empty());
+ DCHECK(active_task_counts_.empty());
+}
+
+bool ThreadPool::Start() {
+ base::AutoLock autolock(lock_);
+ DCHECK(task_queue_.empty());
+ DCHECK(workers_.empty());
+ // Start up min_threads_ workers; if any of the worker threads fail to start,
+ // then this method fails and the ThreadPool should be deleted.
+ for (unsigned int i = 0; i < min_threads_; ++i) {
+ scoped_ptr<WorkerThread> worker(new WorkerThread(this));
+ if (!worker->Start()) {
+ return false;
+ }
+ workers_.insert(worker.release());
+ }
+ DCHECK_EQ(min_threads_, workers_.size());
+ return true;
+}
+
+Executor* ThreadPool::NewExecutor() {
+ return new ThreadPoolExecutor(this);
+}
+
+int ThreadPool::GetNumWorkersForTest() {
+ base::AutoLock autolock(lock_);
+ return workers_.size();
+}
+
+int ThreadPool::GetNumIdleWorkersForTest() {
+ base::AutoLock autolock(lock_);
+ DCHECK_GE(num_busy_workers_, 0u);
+ DCHECK_LE(num_busy_workers_, workers_.size());
+ return workers_.size() - num_busy_workers_;
+}
+
+int ThreadPool::GetNumZombiesForTest() {
+ base::AutoLock autolock(lock_);
+ return zombies_.size();
+}
+
+// This method is called each time we add a new task to the thread pool.
+void ThreadPool::StartNewWorkerIfNeeded() {
+ lock_.AssertAcquired();
+ DCHECK_GE(num_busy_workers_, 0u);
+ DCHECK_LE(num_busy_workers_, workers_.size());
+ DCHECK_GE(workers_.size(), min_threads_);
+ DCHECK_LE(workers_.size(), max_threads_);
+
+ // We create a new worker to handle the task _unless_ either 1) we're already
+ // at the maximum number of threads, or 2) there are already enough idle
+ // workers sitting around to take on this task (and all other pending tasks
+ // that the idle workers haven't yet had a chance to pick up).
+ if (workers_.size() >= max_threads_ ||
+ task_queue_.size() <= workers_.size() - num_busy_workers_) {
+ return;
+ }
+
+ scoped_ptr<WorkerThread> worker(new WorkerThread(this));
+ if (worker->Start()) {
+ workers_.insert(worker.release());
+ } else {
+ LOG(ERROR) << "Failed to start new worker thread.";
+ }
+}
+
+// static
+void ThreadPool::JoinThreads(const std::set<WorkerThread*>& threads) {
+ for (std::set<WorkerThread*>::const_iterator iter = threads.begin();
+ iter != threads.end(); ++iter) {
+ WorkerThread* thread = *iter;
+ thread->Join();
+ delete thread;
+ }
+}
+
+// Call when the worker thread has been idle for a while. Either return false
+// (worker should continue waiting for tasks), or zombify the worker and return
+// true (worker thread should immediately terminate).
+bool ThreadPool::TryZombifyIdleThread(WorkerThread* thread) {
+ lock_.AssertAcquired();
+
+ // Don't terminate the thread if the thread pool is already at the minimum
+ // number of threads.
+ DCHECK_GE(workers_.size(), min_threads_);
+ if (workers_.size() <= min_threads_) {
+ return false;
+ }
+
+ // Remove this thread from the worker set.
+ DCHECK_EQ(1u, workers_.count(thread));
+ workers_.erase(thread);
+
+ // When a (joinable) thread terminates, it must still be cleaned up, either
+ // by another thread joining it, or by detatching it. However, the thread
+ // pool's not shutting down here, so the master thread doesn't know to join
+ // this thread that we're in now, and the Chromium thread abstraction we're
+ // using doesn't currently allow us to detach a thread. So instead, we place
+ // this WorkerThread object into a "zombie" set, which the master thread can
+ // reap later on. Threads that have terminated but that haven't been joined
+ // yet use up only a small amount of memory (I think), so it's okay if we
+ // don't reap it right away, as long as we don't try to spawn new threads
+ // while there's still lots of zombies.
+ DCHECK(!shutting_down_);
+ DCHECK_EQ(0u, zombies_.count(thread));
+ zombies_.insert(thread);
+ return true;
+}
+
+// Get and return the next task from the queue (which must be non-empty), and
+// update our various counters to indicate that the calling worker is busy
+// executing this task.
+ThreadPool::Task ThreadPool::GetNextTask() {
+ lock_.AssertAcquired();
+
+ // Pop the highest-priority task from the queue. Note that smaller values
+ // correspond to higher priorities (SPDY draft 3 section 2.3.3), so
+ // task_queue_.begin() gets us the highest-priority pending task.
+ DCHECK(!task_queue_.empty());
+ TaskQueue::iterator task_iter = task_queue_.begin();
+ const Task task = task_iter->second;
+ task_queue_.erase(task_iter);
+
+ // Increment the count of active tasks for the executor that owns this
+ // task; we'll decrement it again when the task completes.
+ ++(active_task_counts_[task.owner]);
+
+ // The worker that takes this task will be busy until it completes it.
+ DCHECK_LT(num_busy_workers_, workers_.size());
+ ++num_busy_workers_;
+
+ return task;
+}
+
+// Call to indicate that the task has been completed; update our various
+// counters to indicate that the calling worker is no longer busy executing
+// this task.
+void ThreadPool::OnTaskComplete(Task task) {
+ lock_.AssertAcquired();
+
+ // The worker that just finished this task is no longer busy.
+ DCHECK_GE(num_busy_workers_, 1u);
+ --num_busy_workers_;
+
+ // We've completed the task and reaquired the lock, so decrement the count
+ // of active tasks for this owner.
+ OwnerMap::iterator count_iter = active_task_counts_.find(task.owner);
+ DCHECK(count_iter != active_task_counts_.end());
+ DCHECK(count_iter->second > 0);
+ --(count_iter->second);
+
+ // If this was the last active task for the owner, notify anyone who might be
+ // waiting for the owner to stop.
+ if (count_iter->second == 0) {
+ active_task_counts_.erase(count_iter);
+ task.owner->stopping_condvar_.Broadcast();
+ }
+}
+
+} // namespace mod_spdy
Added: httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.h?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.h (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.h Thu May 1 11:43:36 2014
@@ -0,0 +1,145 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_THREAD_POOL_H_
+#define MOD_SPDY_COMMON_THREAD_POOL_H_
+
+#include <map>
+#include <set>
+
+#include "base/basictypes.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/time.h"
+#include "net/spdy/spdy_protocol.h" // for net::SpdyPriority
+
+namespace net_instaweb { class Function; }
+
+namespace mod_spdy {
+
+class Executor;
+
+// A ThreadPool keeps a pool of threads waiting to perform tasks. One can
+// create any number of Executor objects, using the NewExecutor method, which
+// will all share the threads for executing tasks. If more tasks are queued
+// than there are threads in the pool, these executors will respect task
+// priorities when deciding which tasks to execute first.
+class ThreadPool {
+ public:
+ // Create a new thread pool that uses at least min_threads threads, and at
+ // most max_threads threads, at a time. min_threads must be no greater than
+ // max_threads, and both must be positive.
+ ThreadPool(int min_threads, int max_threads);
+
+ // As above, but specify the amount of time after which to kill idle threads,
+ // rather than using the default value (this is primarily for testing).
+ // max_thread_idle_time must be non-negative.
+ ThreadPool(int min_threads, int max_threads,
+ base::TimeDelta max_thread_idle_time);
+
+ // The destructor will block until all threads in the pool have shut down.
+ // The ThreadPool must not be destroyed until all Executor objects returned
+ // from the NewExecutor method have first been deleted.
+ ~ThreadPool();
+
+ // Start up the thread pool. Must be called exactly one before using the
+ // thread pool; returns true on success, or false on failure. If startup
+ // fails, the ThreadPool must be immediately deleted.
+ bool Start();
+
+ // Return a new Executor object that uses this thread pool to perform tasks.
+ // The caller gains ownership of the returned Executor, and the ThreadPool
+ // must outlive the returned Executor.
+ Executor* NewExecutor();
+
+ // Return the current total number of worker threads. This is provided for
+ // testing purposes only.
+ int GetNumWorkersForTest();
+ // Return the number of worker threads currently idle. This is provided for
+ // testing purposes only.
+ int GetNumIdleWorkersForTest();
+ // Return the number of terminated (zombie) threads that have yet to be
+ // reaped. This is provided for testing purposes only.
+ int GetNumZombiesForTest();
+
+ private:
+ class ThreadPoolExecutor;
+ class WorkerThread;
+
+ // A Task is a simple pair of the Function to run, and the executor to which
+ // the task was added.
+ struct Task {
+ Task(net_instaweb::Function* fun, ThreadPoolExecutor* own)
+ : function(fun), owner(own) {}
+ net_instaweb::Function* function;
+ ThreadPoolExecutor* owner;
+ };
+
+ typedef std::multimap<net::SpdyPriority, Task> TaskQueue;
+ typedef std::map<const ThreadPoolExecutor*, int> OwnerMap;
+
+ // Start a new worker thread if 1) the task queue is larger than the number
+ // of currently idle workers, and 2) we have fewer than the maximum number of
+ // workers. Otherwise, do nothing. Must be holding lock_ when calling this.
+ void StartNewWorkerIfNeeded();
+
+ // Join and delete all worker threads in the given set. This will block
+ // until all the threads have terminated and been cleaned up, so don't call
+ // this while holding the lock_.
+ static void JoinThreads(const std::set<WorkerThread*>& threads);
+
+ // These calls are used to implement the WorkerThread's main function. Must
+ // be holding lock_ when calling any of these.
+ bool TryZombifyIdleThread(WorkerThread* thread);
+ Task GetNextTask();
+ void OnTaskComplete(Task task);
+
+ // The min and max number of threads passed to the constructor. Although the
+ // constructor takes signed ints (for convenience), we store these unsigned
+ // to avoid the need for static_casts when comparing against workers_.size().
+ const unsigned int min_threads_;
+ const unsigned int max_threads_;
+ const base::TimeDelta max_thread_idle_time_;
+ // This single master lock protects all of the below fields, as well as any
+ // mutable data and condition variables in the worker threads and executors.
+ // Having just one lock makes everything much easier to understand.
+ base::Lock lock_;
+ // Workers wait on this condvar when waiting for a new task. We signal it
+ // when a new task becomes available, or when we need to shut down.
+ base::ConditionVariable worker_condvar_;
+ // The list of running worker threads. We keep this around so that we can
+ // join the threads on shutdown.
+ std::set<WorkerThread*> workers_;
+ // Worker threads that have shut themselves down (due to being idle), and are
+ // awaiting cleanup by the master thread.
+ std::set<WorkerThread*> zombies_;
+ // How many workers do we have that are actually executing tasks?
+ unsigned int num_busy_workers_;
+ // We set this to true to tell the worker threads to terminate.
+ bool shutting_down_;
+ // The priority queue of pending tasks. Invariant: all Function objects in
+ // the queue have neither been started nor cancelled yet.
+ TaskQueue task_queue_;
+ // This maps executors to the number of currently running tasks for that
+ // executor; we increment when we start a task, and decrement when we finish
+ // it. If the number is zero, we remove the entry from the map; thus, as an
+ // invariant the map only contains entries for executors with active tasks.
+ OwnerMap active_task_counts_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPool);
+};
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_COMMON_THREAD_POOL_H_
Propchange: httpd/mod_spdy/trunk/mod_spdy/common/thread_pool.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/trunk/mod_spdy/common/thread_pool_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/thread_pool_test.cc?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/thread_pool_test.cc (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/thread_pool_test.cc Thu May 1 11:43:36 2014
@@ -0,0 +1,339 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/thread_pool.h"
+
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/platform_thread.h"
+#include "base/time.h"
+#include "mod_spdy/common/executor.h"
+#include "mod_spdy/common/testing/notification.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+// When adding tests here, try to keep them robust against thread scheduling
+// differences from run to run. In particular, they shouldn't fail just
+// because you're running under Valgrind.
+
+namespace {
+
+// When run, a TestFunction waits for `wait` millis, then sets `*result` to
+// RAN. When cancelled, it sets *result to CANCELLED.
+class TestFunction : public net_instaweb::Function {
+ public:
+ enum Result { NOTHING, RAN, CANCELLED };
+ TestFunction(int wait, base::Lock* lock, Result* result)
+ : wait_(wait), lock_(lock), result_(result) {}
+ virtual ~TestFunction() {}
+ protected:
+ // net_instaweb::Function methods:
+ virtual void Run() {
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(wait_));
+ base::AutoLock autolock(*lock_);
+ *result_ = RAN;
+ }
+ virtual void Cancel() {
+ base::AutoLock autolock(*lock_);
+ *result_ = CANCELLED;
+ }
+ private:
+ const int wait_;
+ base::Lock* const lock_;
+ Result* const result_;
+ DISALLOW_COPY_AND_ASSIGN(TestFunction);
+};
+
+// Test that we execute tasks concurrently, that that we respect priorities
+// when pulling tasks from the queue.
+TEST(ThreadPoolTest, ConcurrencyAndPrioritization) {
+ // Create a thread pool with 2 threads, and an executor.
+ mod_spdy::ThreadPool thread_pool(2, 2);
+ ASSERT_TRUE(thread_pool.Start());
+ scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+ base::Lock lock;
+ TestFunction::Result result0 = TestFunction::NOTHING;
+ TestFunction::Result result1 = TestFunction::NOTHING;
+ TestFunction::Result result2 = TestFunction::NOTHING;
+ TestFunction::Result result3 = TestFunction::NOTHING;
+
+ // Create a high-priority TestFunction, which waits for 200 millis then
+ // records that it ran.
+ executor->AddTask(new TestFunction(200, &lock, &result0), 0);
+ // Create several TestFunctions at different priorities. Each waits 100
+ // millis then records that it ran.
+ executor->AddTask(new TestFunction(100, &lock, &result1), 1);
+ executor->AddTask(new TestFunction(100, &lock, &result3), 3);
+ executor->AddTask(new TestFunction(100, &lock, &result2), 2);
+
+ // Wait 150 millis, then stop the executor.
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(150));
+ executor->Stop();
+
+ // Only TestFunctions that _started_ within the first 150 millis should have
+ // run; the others should have been cancelled.
+ // - The priority-0 function should have started first, on the first
+ // thread. It finishes after 200 millis.
+ // - The priority-1 function should run on the second thread. It finishes
+ // after 100 millis.
+ // - The priority-2 function should run on the second thread after the
+ // priority-1 function finishes, even though it was pushed last, because
+ // it's higher-priority than the priority-3 function. It finishes at the
+ // 200-milli mark.
+ // - The priority-3 function should not get a chance to run, because we
+ // stop the executor after 150 millis, and the soonest it could start is
+ // the 200-milli mark.
+ base::AutoLock autolock(lock);
+ EXPECT_EQ(TestFunction::RAN, result0);
+ EXPECT_EQ(TestFunction::RAN, result1);
+ EXPECT_EQ(TestFunction::RAN, result2);
+ EXPECT_EQ(TestFunction::CANCELLED, result3);
+}
+
+// Test that stopping one executor doesn't affect tasks on another executor
+// from the same ThreadPool.
+TEST(ThreadPoolTest, MultipleExecutors) {
+ // Create a thread pool with 3 threads, and two executors.
+ mod_spdy::ThreadPool thread_pool(3, 3);
+ ASSERT_TRUE(thread_pool.Start());
+ scoped_ptr<mod_spdy::Executor> executor1(thread_pool.NewExecutor());
+ scoped_ptr<mod_spdy::Executor> executor2(thread_pool.NewExecutor());
+
+ base::Lock lock;
+ TestFunction::Result e1r1 = TestFunction::NOTHING;
+ TestFunction::Result e1r2 = TestFunction::NOTHING;
+ TestFunction::Result e1r3 = TestFunction::NOTHING;
+ TestFunction::Result e2r1 = TestFunction::NOTHING;
+ TestFunction::Result e2r2 = TestFunction::NOTHING;
+ TestFunction::Result e2r3 = TestFunction::NOTHING;
+
+ // Add some tasks to the executors. Each one takes 50 millis to run.
+ executor1->AddTask(new TestFunction(50, &lock, &e1r1), 0);
+ executor2->AddTask(new TestFunction(50, &lock, &e2r1), 0);
+ executor1->AddTask(new TestFunction(50, &lock, &e1r2), 0);
+ executor2->AddTask(new TestFunction(50, &lock, &e2r2), 1);
+ executor1->AddTask(new TestFunction(50, &lock, &e1r3), 3);
+ executor2->AddTask(new TestFunction(50, &lock, &e2r3), 1);
+
+ // Wait 20 millis (to make sure the first few tasks got picked up), then
+ // destroy executor2, which should stop it. Finally, sleep another 100
+ // millis to give the remaining tasks a chance to finish.
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
+ executor2.reset();
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
+
+ // The three high priority tasks should have all run. The other two tasks on
+ // executor2 should have been cancelled when we stopped executor2, but the
+ // low-priority task on executor1 should have been left untouched, and
+ // allowed to finish.
+ base::AutoLock autolock(lock);
+ EXPECT_EQ(TestFunction::RAN, e1r1);
+ EXPECT_EQ(TestFunction::RAN, e2r1);
+ EXPECT_EQ(TestFunction::RAN, e1r2);
+ EXPECT_EQ(TestFunction::CANCELLED, e2r2);
+ EXPECT_EQ(TestFunction::RAN, e1r3);
+ EXPECT_EQ(TestFunction::CANCELLED, e2r3);
+}
+
+// When run, a WaitFunction blocks until the notification is set.
+class WaitFunction : public net_instaweb::Function {
+ public:
+ WaitFunction(mod_spdy::testing::Notification* notification)
+ : notification_(notification) {}
+ virtual ~WaitFunction() {}
+ protected:
+ // net_instaweb::Function methods:
+ virtual void Run() {
+ notification_->Wait();
+ }
+ virtual void Cancel() {}
+ private:
+ mod_spdy::testing::Notification* const notification_;
+ DISALLOW_COPY_AND_ASSIGN(WaitFunction);
+};
+
+// When run, an IdFunction pushes its ID onto the vector.
+class IdFunction : public net_instaweb::Function {
+ public:
+ IdFunction(int id, base::Lock* lock, base::ConditionVariable* condvar,
+ std::vector<int>* output)
+ : id_(id), lock_(lock), condvar_(condvar), output_(output) {}
+ virtual ~IdFunction() {}
+ protected:
+ // net_instaweb::Function methods:
+ virtual void Run() {
+ base::AutoLock autolock(*lock_);
+ output_->push_back(id_);
+ condvar_->Broadcast();
+ }
+ virtual void Cancel() {}
+ private:
+ const int id_;
+ base::Lock* const lock_;
+ base::ConditionVariable* const condvar_;
+ std::vector<int>* const output_;
+ DISALLOW_COPY_AND_ASSIGN(IdFunction);
+};
+
+// Test that if many tasks of the same priority are added, they are run in the
+// order they were added.
+TEST(ThreadPoolTest, SamePriorityTasksAreFIFO) {
+ // Create a thread pool with just one thread, and an executor.
+ mod_spdy::ThreadPool thread_pool(1, 1);
+ ASSERT_TRUE(thread_pool.Start());
+ scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+ // First, make sure no other tasks will get started until we set the
+ // notification.
+ mod_spdy::testing::Notification start;
+ executor->AddTask(new WaitFunction(&start), 0);
+
+ // Add many tasks to the executor, of varying priorities.
+ const int num_tasks_each_priority = 1000;
+ const int total_num_tasks = 3 * num_tasks_each_priority;
+ base::Lock lock;
+ base::ConditionVariable condvar(&lock);
+ std::vector<int> ids; // protected by lock
+ for (int id = 0; id < num_tasks_each_priority; ++id) {
+ executor->AddTask(new IdFunction(id, &lock, &condvar, &ids), 1);
+ executor->AddTask(new IdFunction(id + num_tasks_each_priority,
+ &lock, &condvar, &ids), 2);
+ executor->AddTask(new IdFunction(id + 2 * num_tasks_each_priority,
+ &lock, &condvar, &ids), 3);
+ }
+
+ // Start us off, then wait for all tasks to finish.
+ start.Set();
+ base::AutoLock autolock(lock);
+ while (static_cast<int>(ids.size()) < total_num_tasks) {
+ condvar.Wait();
+ }
+
+ // Check that the tasks were executed in order by the one worker thread.
+ for (int index = 0; index < total_num_tasks; ++index) {
+ ASSERT_EQ(index, ids[index])
+ << "Task " << ids[index] << " finished in position " << index;
+ }
+}
+
+// Add a test failure if the thread pool does not stabilize to the expected
+// total/idle number of worker threads withing the given timeout.
+void ExpectWorkersWithinTimeout(int expected_num_workers,
+ int expected_num_idle_workers,
+ mod_spdy::ThreadPool* thread_pool,
+ int timeout_millis) {
+ int millis_remaining = timeout_millis;
+ while (true) {
+ const int actual_num_workers = thread_pool->GetNumWorkersForTest();
+ const int actual_num_idle_workers =
+ thread_pool->GetNumIdleWorkersForTest();
+ if (actual_num_workers == expected_num_workers &&
+ actual_num_idle_workers == expected_num_idle_workers) {
+ return;
+ }
+ if (millis_remaining <= 0) {
+ ADD_FAILURE() << "Timed out; expected " << expected_num_workers
+ << " worker(s) with " << expected_num_idle_workers
+ <<" idle; still at " << actual_num_workers
+ << " worker(s) with " << actual_num_idle_workers
+ << " idle after " << timeout_millis << "ms";
+ return;
+ }
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10));
+ millis_remaining -= 10;
+ }
+}
+
+// Test that we spawn new threads as needed, and allow them to die off after
+// being idle for a while.
+TEST(ThreadPoolTest, CreateAndRetireWorkers) {
+ // Create a thread pool with min_threads < max_threads, and give it a short
+ // max_thread_idle_time.
+ const int idle_time_millis = 100;
+ mod_spdy::ThreadPool thread_pool(
+ 2, 4, base::TimeDelta::FromMilliseconds(idle_time_millis));
+ ASSERT_TRUE(thread_pool.Start());
+ // As soon as we start the thread pool, there should be the minimum number of
+ // workers (two), both counted as idle.
+ EXPECT_EQ(2, thread_pool.GetNumWorkersForTest());
+ EXPECT_EQ(2, thread_pool.GetNumIdleWorkersForTest());
+
+ scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+ // Start up three tasks. That should push us up to three workers
+ // immediately. If we make sure to give those threads a chance to run, they
+ // should soon pick up the tasks and all be busy.
+ mod_spdy::testing::Notification done1;
+ executor->AddTask(new WaitFunction(&done1), 0);
+ executor->AddTask(new WaitFunction(&done1), 1);
+ executor->AddTask(new WaitFunction(&done1), 2);
+ EXPECT_EQ(3, thread_pool.GetNumWorkersForTest());
+ ExpectWorkersWithinTimeout(3, 0, &thread_pool, 100);
+
+ // Add three more tasks. We should now be at the maximum number of workers,
+ // and that fourth worker should be busy soon.
+ mod_spdy::testing::Notification done2;
+ executor->AddTask(new WaitFunction(&done2), 1);
+ executor->AddTask(new WaitFunction(&done2), 2);
+ executor->AddTask(new WaitFunction(&done2), 3);
+ EXPECT_EQ(4, thread_pool.GetNumWorkersForTest());
+ ExpectWorkersWithinTimeout(4, 0, &thread_pool, 100);
+
+ // Allow the first group of tasks to finish. There are now only three tasks
+ // running, so one of our four threads should go idle. If we wait for a
+ // while after that, that thread should terminate and enter zombie mode.
+ done1.Set();
+ ExpectWorkersWithinTimeout(4, 1, &thread_pool, idle_time_millis / 2);
+ ExpectWorkersWithinTimeout(3, 0, &thread_pool, 2 * idle_time_millis);
+ EXPECT_EQ(1, thread_pool.GetNumZombiesForTest());
+
+ // Allow the second group of tasks to finish. There are no tasks left, so
+ // all three threads should go idle. If we wait for a while after that,
+ // exactly one of the three should shut down, bringing us back down to the
+ // minimum number of threads. We should now have two zombie threads.
+ done2.Set();
+ ExpectWorkersWithinTimeout(3, 3, &thread_pool, idle_time_millis / 2);
+ ExpectWorkersWithinTimeout(2, 2, &thread_pool, 2 * idle_time_millis);
+ EXPECT_EQ(2, thread_pool.GetNumZombiesForTest());
+
+ // Start some new new tasks. This should cause us to immediately reap the
+ // zombie threads, and soon, we should have three busy threads.
+ mod_spdy::testing::Notification done3;
+ executor->AddTask(new WaitFunction(&done3), 0);
+ executor->AddTask(new WaitFunction(&done3), 2);
+ executor->AddTask(new WaitFunction(&done3), 1);
+ EXPECT_EQ(0, thread_pool.GetNumZombiesForTest());
+ EXPECT_EQ(3, thread_pool.GetNumWorkersForTest());
+ ExpectWorkersWithinTimeout(3, 0, &thread_pool, 100);
+
+ // Let those tasks finish. Once again, the threads should go idle, and then
+ // one of them should terminate and enter zombie mode.
+ done3.Set();
+ ExpectWorkersWithinTimeout(3, 3, &thread_pool, idle_time_millis / 2);
+ ExpectWorkersWithinTimeout(2, 2, &thread_pool, 2 * idle_time_millis);
+ EXPECT_EQ(1, thread_pool.GetNumZombiesForTest());
+
+ // When we exit the test, the thread pool's destructor should reap the zombie
+ // thread (as well as shutting down the still-running workers). We can
+ // verify this by running this test under valgrind and making sure that no
+ // memory is leaked.
+}
+
+} // namespace
Added: httpd/mod_spdy/trunk/mod_spdy/common/version.h.in
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/trunk/mod_spdy/common/version.h.in?rev=1591622&view=auto
==============================================================================
--- httpd/mod_spdy/trunk/mod_spdy/common/version.h.in (added)
+++ httpd/mod_spdy/trunk/mod_spdy/common/version.h.in Thu May 1 11:43:36 2014
@@ -0,0 +1,25 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// version.h is generated from version.h.in. Edit the source!
+
+#pragma once
+
+// Version Information
+
+#define MOD_SPDY_VERSION @MAJOR@,@MINOR@,@BUILD@,@PATCH@
+#define MOD_SPDY_VERSION_STRING "@MAJOR@.@MINOR@.@BUILD@.@PATCH@"
+
+// Branding Information
+
+#define COMPANY_FULLNAME_STRING "@COMPANY_FULLNAME@"
+#define COMPANY_SHORTNAME_STRING "@COMPANY_SHORTNAME@"
+#define PRODUCT_FULLNAME_STRING "@PRODUCT_FULLNAME@"
+#define PRODUCT_SHORTNAME_STRING "@PRODUCT_SHORTNAME@"
+#define COPYRIGHT_STRING "@COPYRIGHT@"
+#define OFFICIAL_BUILD_STRING "@OFFICIAL_BUILD@"
+
+// Changelist Information
+
+#define LASTCHANGE_STRING "@LASTCHANGE@"
Propchange: httpd/mod_spdy/trunk/mod_spdy/common/version.h.in
------------------------------------------------------------------------------
svn:eol-style = native