You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ji...@apache.org on 2014/05/01 13:39:32 UTC
svn commit: r1591620 [10/14] - in /httpd/mod_spdy/branches/httpd-2.2.x: ./
base/ base/metrics/ build/ install/ install/common/ install/debian/
install/rpm/ mod_spdy/ mod_spdy/apache/ mod_spdy/apache/filters/
mod_spdy/apache/testing/ mod_spdy/common/ mo...
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,271 @@
+// Copyright 2012 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+
+#include <iostream>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/strings/stringprintf.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+
+namespace {
+
+void AppendHeadersString(const net::SpdyNameValueBlock& headers,
+ std::string* out) {
+ out->append("{ ");
+ bool comma = false;
+ for (net::SpdyNameValueBlock::const_iterator iter = headers.begin();
+ iter != headers.end(); ++iter) {
+ if (comma) {
+ out->append(", ");
+ }
+ base::StringAppendF(out, "'%s': '%s'", iter->first.c_str(),
+ iter->second.c_str());
+ comma = true;
+ }
+ out->append(" }");
+}
+
+class FrameToStringVisitor : public net::SpdyFrameVisitor {
+ public:
+ explicit FrameToStringVisitor(std::string* out)
+ : out_(out) {
+ CHECK(out_);
+ }
+ virtual ~FrameToStringVisitor() {}
+
+ virtual void VisitSynStream(const net::SpdySynStreamIR& syn_stream) {
+ // TODO(mdsteele): include other fields
+ base::StringAppendF(
+ out_, "SYN_STREAM(%u p%u%s%s)",
+ static_cast<unsigned>(syn_stream.stream_id()),
+ static_cast<unsigned>(syn_stream.priority()),
+ (syn_stream.fin() ? " fin" : ""),
+ (syn_stream.unidirectional() ? " unidirectional" : ""));
+ AppendHeadersString(syn_stream.name_value_block(), out_);
+ }
+ virtual void VisitSynReply(const net::SpdySynReplyIR& syn_reply) {
+ base::StringAppendF(
+ out_, "SYN_REPLY(%u%s)",
+ static_cast<unsigned>(syn_reply.stream_id()),
+ (syn_reply.fin() ? " fin" : ""));
+ AppendHeadersString(syn_reply.name_value_block(), out_);
+ }
+ virtual void VisitRstStream(const net::SpdyRstStreamIR& rst_stream) {
+ base::StringAppendF(
+ out_, "RST_STREAM(%u %s)",
+ static_cast<unsigned>(rst_stream.stream_id()),
+ mod_spdy::RstStreamStatusCodeToString(rst_stream.status()));
+ }
+ virtual void VisitSettings(const net::SpdySettingsIR& settings) {
+ base::StringAppendF(
+ out_, "SETTINGS(%s",
+ (settings.clear_settings() ? "clear " : ""));
+ bool comma = false;
+ for (net::SpdySettingsIR::ValueMap::const_iterator iter =
+ settings.values().begin(), end = settings.values().end();
+ iter != end; ++iter) {
+ if (comma) {
+ out_->append(", ");
+ }
+ base::StringAppendF(
+ out_, "%s%s%s: %d",
+ (iter->second.persist_value ? "persist " : ""),
+ (iter->second.persisted ? "persisted " : ""),
+ mod_spdy::SettingsIdToString(iter->first),
+ static_cast<int>(iter->second.value));
+ }
+ out_->append(")");
+ }
+ virtual void VisitPing(const net::SpdyPingIR& ping) {
+ base::StringAppendF(
+ out_, "PING(%u)", static_cast<unsigned>(ping.id()));
+ }
+ virtual void VisitGoAway(const net::SpdyGoAwayIR& goaway) {
+ base::StringAppendF(
+ out_, "GOAWAY(%u %s)",
+ static_cast<unsigned>(goaway.last_good_stream_id()),
+ mod_spdy::GoAwayStatusCodeToString(goaway.status()));
+ }
+ virtual void VisitHeaders(const net::SpdyHeadersIR& headers) {
+ base::StringAppendF(
+ out_, "HEADERS(%u%s)", static_cast<unsigned>(headers.stream_id()),
+ (headers.fin() ? " fin" : ""));
+ AppendHeadersString(headers.name_value_block(), out_);
+ }
+ virtual void VisitWindowUpdate(const net::SpdyWindowUpdateIR& window) {
+ base::StringAppendF(
+ out_, "WINDOW_UPDATE(%u %+d)",
+ static_cast<unsigned>(window.stream_id()),
+ static_cast<int>(window.delta()));
+ }
+ virtual void VisitCredential(const net::SpdyCredentialIR& credential) {
+ // TODO(mdsteele): include other fields
+ base::StringAppendF(
+ out_, "CREDENTIAL(%d)", static_cast<int>(credential.slot()));
+ }
+ virtual void VisitBlocked(const net::SpdyBlockedIR& blocked) {
+ base::StringAppendF(
+ out_, "BLOCKED(%u)", static_cast<unsigned>(blocked.stream_id()));
+ }
+ virtual void VisitPushPromise(const net::SpdyPushPromiseIR& push_promise) {
+ base::StringAppendF(
+ out_, "PUSH_PROMISE(%u, %u)",
+ static_cast<unsigned>(push_promise.stream_id()),
+ static_cast<unsigned>(push_promise.promised_stream_id()));
+ }
+ virtual void VisitData(const net::SpdyDataIR& data) {
+ base::StringAppendF(
+ out_, "DATA(%u%s \"", static_cast<unsigned>(data.stream_id()),
+ (data.fin() ? " fin" : ""));
+ out_->append(data.data().data(), data.data().size());
+ out_->append("\")");
+ }
+
+ private:
+ std::string* out_;
+
+ DISALLOW_COPY_AND_ASSIGN(FrameToStringVisitor);
+};
+
+void AppendSpdyFrameToString(const net::SpdyFrameIR& frame, std::string* out) {
+ FrameToStringVisitor visitor(out);
+ frame.Visit(&visitor);
+}
+
+class IsEquivalentFrameMatcher :
+ public ::testing::MatcherInterface<const net::SpdyFrameIR&> {
+ public:
+ explicit IsEquivalentFrameMatcher(const net::SpdyFrameIR& frame);
+ virtual ~IsEquivalentFrameMatcher();
+ virtual bool MatchAndExplain(const net::SpdyFrameIR& frame,
+ ::testing::MatchResultListener* listener) const;
+ virtual void DescribeTo(std::ostream* out) const;
+ virtual void DescribeNegationTo(std::ostream* out) const;
+
+ private:
+ std::string expected_;
+
+ DISALLOW_COPY_AND_ASSIGN(IsEquivalentFrameMatcher);
+};
+
+IsEquivalentFrameMatcher::IsEquivalentFrameMatcher(
+ const net::SpdyFrameIR& frame) {
+ AppendSpdyFrameToString(frame, &expected_);
+}
+
+IsEquivalentFrameMatcher::~IsEquivalentFrameMatcher() {}
+
+bool IsEquivalentFrameMatcher::MatchAndExplain(
+ const net::SpdyFrameIR& frame,
+ ::testing::MatchResultListener* listener) const {
+ std::string actual;
+ AppendSpdyFrameToString(frame, &actual);
+ if (actual != expected_) {
+ *listener << "is a " << actual << " frame";
+ return false;
+ }
+ return true;
+}
+
+void IsEquivalentFrameMatcher::DescribeTo(std::ostream* out) const {
+ *out << "is a " << expected_ << " frame";
+}
+
+void IsEquivalentFrameMatcher::DescribeNegationTo(std::ostream* out) const {
+ *out << "isn't a " << expected_ << " frame";
+}
+
+} // namespace
+
+namespace mod_spdy {
+
+namespace testing {
+
+::testing::Matcher<const net::SpdyFrameIR&> IsSynStream(
+ net::SpdyStreamId stream_id, net::SpdyStreamId assoc_stream_id,
+ net::SpdyPriority priority, bool fin, bool unidirectional,
+ const net::SpdyNameValueBlock& headers) {
+ net::SpdySynStreamIR frame(stream_id);
+ frame.set_associated_to_stream_id(assoc_stream_id);
+ frame.set_priority(priority);
+ frame.set_fin(fin);
+ frame.set_unidirectional(unidirectional);
+ frame.GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsSynReply(
+ net::SpdyStreamId stream_id, bool fin,
+ const net::SpdyNameValueBlock& headers) {
+ net::SpdySynReplyIR frame(stream_id);
+ frame.set_fin(fin);
+ frame.GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsRstStream(
+ net::SpdyStreamId stream_id, net::SpdyRstStreamStatus status) {
+ net::SpdyRstStreamIR frame(stream_id, status);
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsSettings(
+ net::SpdySettingsIds id, int32 value) {
+ net::SpdySettingsIR frame;
+ frame.AddSetting(id, false, false, value);
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsPing(net::SpdyPingId ping_id) {
+ net::SpdyPingIR frame(ping_id);
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsGoAway(
+ net::SpdyStreamId last_good_stream_id, net::SpdyGoAwayStatus status) {
+ net::SpdyGoAwayIR frame(last_good_stream_id, status);
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsHeaders(
+ net::SpdyStreamId stream_id, bool fin,
+ const net::SpdyNameValueBlock& headers) {
+ net::SpdyHeadersIR frame(stream_id);
+ frame.set_fin(fin);
+ frame.GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsWindowUpdate(
+ net::SpdyStreamId stream_id, uint32 delta) {
+ net::SpdyWindowUpdateIR frame(stream_id, delta);
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+::testing::Matcher<const net::SpdyFrameIR&> IsDataFrame(
+ net::SpdyStreamId stream_id, bool fin, base::StringPiece payload) {
+ net::SpdyDataIR frame(stream_id, payload);
+ frame.set_fin(fin);
+ return ::testing::MakeMatcher(new IsEquivalentFrameMatcher(frame));
+}
+
+} // namespace testing
+
+} // namespace mod_spdy
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h Thu May 1 11:39:27 2014
@@ -0,0 +1,79 @@
+// Copyright 2012 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_
+#define MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_
+
+#include "base/basictypes.h"
+#include "base/strings/string_piece.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+
+namespace mod_spdy {
+
+namespace testing {
+
+// Make a matcher that requires the argument to be a SYN_STREAM frame with the
+// given stream ID, associated stream ID, priority, flag_fin,
+// flag_unidirectional, and headers.
+::testing::Matcher<const net::SpdyFrameIR&> IsSynStream(
+ net::SpdyStreamId stream_id, net::SpdyStreamId assoc_stream_id,
+ net::SpdyPriority priority, bool fin, bool unidirectional,
+ const net::SpdyNameValueBlock& headers);
+
+// Make a matcher that requires the argument to be a SYN_REPLY frame with the
+// given stream ID, flag_fin, and headers.
+::testing::Matcher<const net::SpdyFrameIR&> IsSynReply(
+ net::SpdyStreamId stream_id, bool fin,
+ const net::SpdyNameValueBlock& headers);
+
+// Make a matcher that requires the argument to be a RST_STREAM frame with the
+// given stream ID and status code.
+::testing::Matcher<const net::SpdyFrameIR&> IsRstStream(
+ net::SpdyStreamId stream_id, net::SpdyRstStreamStatus status);
+
+// Make a matcher that requires the argument to be a SETTINGS frame with the
+// given setting.
+::testing::Matcher<const net::SpdyFrameIR&> IsSettings(
+ net::SpdySettingsIds id, int32 value);
+
+// Make a matcher that requires the argument to be a PING frame with the
+// given ID.
+::testing::Matcher<const net::SpdyFrameIR&> IsPing(net::SpdyPingId ping_id);
+
+// Make a matcher that requires the argument to be a GOAWAY frame with the
+// given last-good-stream-ID and status code.
+::testing::Matcher<const net::SpdyFrameIR&> IsGoAway(
+ net::SpdyStreamId last_good_stream_id, net::SpdyGoAwayStatus status);
+
+// Make a matcher that requires the argument to be a HEADERS frame with the
+// given stream ID, flag_fin, and headers.
+::testing::Matcher<const net::SpdyFrameIR&> IsHeaders(
+ net::SpdyStreamId stream_id, bool fin,
+ const net::SpdyNameValueBlock& headers);
+
+// Make a matcher that requires the argument to be a WINDOW_UPDATE frame with
+// the given window-size-delta.
+::testing::Matcher<const net::SpdyFrameIR&> IsWindowUpdate(
+ net::SpdyStreamId stream_id, uint32 delta);
+
+// Make a matcher that requires the argument to be a DATA frame.
+::testing::Matcher<const net::SpdyFrameIR&> IsDataFrame(
+ net::SpdyStreamId stream_id, bool fin, base::StringPiece payload);
+
+} // namespace testing
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_TESTING_SPDY_FRAME_MATCHERS_H_
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/testing/spdy_frame_matchers.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,490 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/thread_pool.h"
+
+#include <map>
+#include <set>
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
+#include "mod_spdy/common/executor.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace {
+
+// Shut down a worker thread after it has been idle for this many seconds:
+const int64 kDefaultMaxWorkerIdleSeconds = 60;
+
+} // namespace
+
+namespace mod_spdy {
+
+// An executor that uses the ThreadPool to execute tasks. Returned by
+// ThreadPool::NewExecutor.
+class ThreadPool::ThreadPoolExecutor : public Executor {
+ public:
+ explicit ThreadPoolExecutor(ThreadPool* master)
+ : master_(master),
+ stopping_condvar_(&master_->lock_),
+ stopped_(false) {}
+ virtual ~ThreadPoolExecutor() { Stop(); }
+
+ // Executor methods:
+ virtual void AddTask(net_instaweb::Function* task,
+ net::SpdyPriority priority);
+ virtual void Stop();
+
+ private:
+ friend class ThreadPool;
+ ThreadPool* const master_;
+ base::ConditionVariable stopping_condvar_;
+ bool stopped_; // protected by master_->lock_
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPoolExecutor);
+};
+
+// Add a task to the executor; if the executor has already been stopped, just
+// cancel the task immediately.
+void ThreadPool::ThreadPoolExecutor::AddTask(net_instaweb::Function* task,
+ net::SpdyPriority priority) {
+ {
+ base::AutoLock autolock(master_->lock_);
+
+ // Clean up any zombie WorkerThreads in the ThreadPool that are waiting for
+ // reaping. If the OS process we're in accumulates too many unjoined
+ // zombie threads over time, the OS might not be able to spawn a new thread
+ // below. So right now is a good time to clean them up.
+ if (!master_->zombies_.empty()) {
+ std::set<WorkerThread*> zombies;
+ zombies.swap(master_->zombies_);
+ // Joining these threads should be basically instant, since they've
+ // already terminated. But to be safe, let's unlock while we join them.
+ base::AutoUnlock autounlock(master_->lock_);
+ ThreadPool::JoinThreads(zombies);
+ }
+
+ // The thread pool shouldn't be shutting down until all executors are
+ // destroyed. Since this executor clearly still exists, the thread pool
+ // must still be open.
+ DCHECK(!master_->shutting_down_);
+
+ // If the executor hasn't been stopped, add the task to the queue and
+ // notify a worker that there's a new task ready to be taken.
+ if (!stopped_) {
+ master_->task_queue_.insert(std::make_pair(priority, Task(task, this)));
+ master_->worker_condvar_.Signal();
+ master_->StartNewWorkerIfNeeded();
+ return;
+ }
+ }
+
+ // If this executor has already been stopped, just cancel the task (after
+ // releasing the lock).
+ task->CallCancel();
+}
+
+// Stop the executor. Cancel all pending tasks in the thread pool owned by
+// this executor, and then block until all active tasks owned by this executor
+// complete. Stopping the executor more than once has no effect.
+void ThreadPool::ThreadPoolExecutor::Stop() {
+ std::vector<net_instaweb::Function*> functions_to_cancel;
+ {
+ base::AutoLock autolock(master_->lock_);
+ if (stopped_) {
+ return;
+ }
+ stopped_ = true;
+
+ // Remove all tasks owned by this executor from the queue, and collect up
+ // the function objects to be cancelled.
+ TaskQueue::iterator next_iter = master_->task_queue_.begin();
+ while (next_iter != master_->task_queue_.end()) {
+ TaskQueue::iterator iter = next_iter;
+ const Task& task = iter->second;
+ ++next_iter; // Increment next_iter _before_ we might erase iter.
+ if (task.owner == this) {
+ functions_to_cancel.push_back(task.function);
+ master_->task_queue_.erase(iter);
+ }
+ }
+ }
+
+ // Unlock while we cancel the functions, so we're not hogging the lock for
+ // too long, and to avoid potential deadlock if the cancel method tries to do
+ // anything with the thread pool.
+ for (std::vector<net_instaweb::Function*>::const_iterator iter =
+ functions_to_cancel.begin();
+ iter != functions_to_cancel.end(); ++iter) {
+ (*iter)->CallCancel();
+ }
+ // CallCancel deletes the Function objects, invalidating the pointers in this
+ // list, so let's go ahead and clear it (which also saves a little memory
+ // while we're blocked below).
+ functions_to_cancel.clear();
+
+ // Block until all our active tasks are completed.
+ {
+ base::AutoLock autolock(master_->lock_);
+ while (master_->active_task_counts_.count(this) > 0) {
+ stopping_condvar_.Wait();
+ }
+ }
+}
+
+// A WorkerThread object wraps a platform-specific thread handle, and provides
+// the method run by that thread (ThreadMain).
+class ThreadPool::WorkerThread : public base::PlatformThread::Delegate {
+ public:
+ explicit WorkerThread(ThreadPool* master);
+ virtual ~WorkerThread();
+
+ // Start the thread running. Return false on failure. If this succeeds,
+ // then you must call Join() before deleting this object.
+ bool Start();
+
+ // Block until the thread completes. You must set master_->shutting_down_ to
+ // true before calling this method, or the thread will never terminate.
+ // You shouldn't be holding master_->lock_ when calling this.
+ void Join();
+
+ // base::PlatformThread::Delegate method:
+ virtual void ThreadMain();
+
+ private:
+ enum ThreadState { NOT_STARTED, STARTED, JOINED };
+
+ ThreadPool* const master_;
+ // If two master threads are sharing the same ThreadPool, then Start() and
+ // Join() might get called by different threads. So to be safe we use a lock
+ // to protect the two below fields.
+ base::Lock thread_lock_;
+ ThreadState state_;
+ base::PlatformThreadHandle thread_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(WorkerThread);
+};
+
+ThreadPool::WorkerThread::WorkerThread(ThreadPool* master)
+ : master_(master), state_(NOT_STARTED), thread_id_() {}
+
+ThreadPool::WorkerThread::~WorkerThread() {
+ base::AutoLock autolock(thread_lock_);
+ // If we started the thread, we _must_ join it before deleting this object,
+ // or else the thread won't get cleaned up by the OS.
+ DCHECK(state_ == NOT_STARTED || state_ == JOINED);
+}
+
+bool ThreadPool::WorkerThread::Start() {
+ base::AutoLock autolock(thread_lock_);
+ DCHECK_EQ(NOT_STARTED, state_);
+ if (base::PlatformThread::Create(0, this, &thread_id_)) {
+ state_ = STARTED;
+ return true;
+ }
+ return false;
+}
+
+void ThreadPool::WorkerThread::Join() {
+ base::AutoLock autolock(thread_lock_);
+ DCHECK_EQ(STARTED, state_);
+ base::PlatformThread::Join(thread_id_);
+ state_ = JOINED;
+}
+
+// This is the code executed by the thread; when this method returns, the
+// thread will terminate.
+void ThreadPool::WorkerThread::ThreadMain() {
+ // We start by grabbing the master lock, but we release it below whenever we
+ // are 1) waiting for a new task or 2) executing a task. So in fact most of
+ // the time we are not holding the lock.
+ base::AutoLock autolock(master_->lock_);
+ while (true) {
+ // Wait until there's a task available (or we're shutting down), but don't
+ // stay idle for more than kMaxWorkerIdleSeconds seconds.
+ base::TimeDelta time_remaining = master_->max_thread_idle_time_;
+ while (!master_->shutting_down_ && master_->task_queue_.empty() &&
+ time_remaining.InSecondsF() > 0.0) {
+ // Note that TimedWait can wake up spuriously before the time runs out,
+ // so we need to measure how long we actually waited for.
+ const base::Time start = base::Time::Now();
+ master_->worker_condvar_.TimedWait(time_remaining);
+ const base::Time end = base::Time::Now();
+ // Note that the system clock can go backwards if it is reset, so make
+ // sure we never _increase_ time_remaining.
+ if (end > start) {
+ time_remaining -= end - start;
+ }
+ }
+
+ // If the thread pool is shutting down, terminate this thread; the master
+ // is about to join/delete us (in its destructor).
+ if (master_->shutting_down_) {
+ return;
+ }
+
+ // If we ran out of time without getting a task, maybe this thread should
+ // shut itself down.
+ if (master_->task_queue_.empty()) {
+ DCHECK_LE(time_remaining.InSecondsF(), 0.0);
+ // Ask the master if we should stop. If this returns true, this worker
+ // has been zombified, so we're free to terminate the thread.
+ if (master_->TryZombifyIdleThread(this)) {
+ return; // Yes, we should stop; terminate the thread.
+ } else {
+ continue; // No, we shouldn't stop; jump to the top of the while loop.
+ }
+ }
+
+ // Otherwise, there must be at least one task available now. Grab one from
+ // the master, who will then treat us as busy until we complete it.
+ const Task task = master_->GetNextTask();
+ // Release the lock while we execute the task. Note that we use AutoUnlock
+ // here rather than one AutoLock for the above code and another for the
+ // below code, so that we don't have to release and reacquire the lock at
+ // the edge of the while-loop.
+ {
+ base::AutoUnlock autounlock(master_->lock_);
+ task.function->CallRun();
+ }
+ // Inform the master we have completed the task and are no longer busy.
+ master_->OnTaskComplete(task);
+ }
+}
+
+ThreadPool::ThreadPool(int min_threads, int max_threads)
+ : min_threads_(min_threads),
+ max_threads_(max_threads),
+ max_thread_idle_time_(
+ base::TimeDelta::FromSeconds(kDefaultMaxWorkerIdleSeconds)),
+ worker_condvar_(&lock_),
+ num_busy_workers_(0),
+ shutting_down_(false) {
+ DCHECK_GE(max_thread_idle_time_.InSecondsF(), 0.0);
+ // Note that we check e.g. min_threads rather than min_threads_ (which is
+ // unsigned), in order to catch negative numbers.
+ DCHECK_GE(min_threads, 1);
+ DCHECK_GE(max_threads, 1);
+ DCHECK_LE(min_threads_, max_threads_);
+}
+
+ThreadPool::ThreadPool(int min_threads, int max_threads,
+ base::TimeDelta max_thread_idle_time)
+ : min_threads_(min_threads),
+ max_threads_(max_threads),
+ max_thread_idle_time_(max_thread_idle_time),
+ worker_condvar_(&lock_),
+ num_busy_workers_(0),
+ shutting_down_(false) {
+ DCHECK_GE(max_thread_idle_time_.InSecondsF(), 0.0);
+ DCHECK_GE(min_threads, 1);
+ DCHECK_GE(max_threads, 1);
+ DCHECK_LE(min_threads_, max_threads_);
+}
+
+ThreadPool::~ThreadPool() {
+ base::AutoLock autolock(lock_);
+
+ // If we're doing things right, all the Executors should have been
+ // destroyed before the ThreadPool is destroyed, so there should be no
+ // pending or active tasks.
+ DCHECK(task_queue_.empty());
+ DCHECK(active_task_counts_.empty());
+
+ // Wake up all the worker threads and tell them to shut down.
+ shutting_down_ = true;
+ worker_condvar_.Broadcast();
+
+ // Clean up all our threads.
+ std::set<WorkerThread*> threads;
+ zombies_.swap(threads);
+ threads.insert(workers_.begin(), workers_.end());
+ workers_.clear();
+ {
+ base::AutoUnlock autounlock(lock_);
+ JoinThreads(threads);
+ }
+
+ // Because we had shutting_down_ set to true, nothing should have been added
+ // to our WorkerThread sets while we were unlocked. So we should be all
+ // cleaned up now.
+ DCHECK(workers_.empty());
+ DCHECK(zombies_.empty());
+ DCHECK(task_queue_.empty());
+ DCHECK(active_task_counts_.empty());
+}
+
+bool ThreadPool::Start() {
+ base::AutoLock autolock(lock_);
+ DCHECK(task_queue_.empty());
+ DCHECK(workers_.empty());
+ // Start up min_threads_ workers; if any of the worker threads fail to start,
+ // then this method fails and the ThreadPool should be deleted.
+ for (unsigned int i = 0; i < min_threads_; ++i) {
+ scoped_ptr<WorkerThread> worker(new WorkerThread(this));
+ if (!worker->Start()) {
+ return false;
+ }
+ workers_.insert(worker.release());
+ }
+ DCHECK_EQ(min_threads_, workers_.size());
+ return true;
+}
+
+Executor* ThreadPool::NewExecutor() {
+ return new ThreadPoolExecutor(this);
+}
+
+int ThreadPool::GetNumWorkersForTest() {
+ base::AutoLock autolock(lock_);
+ return workers_.size();
+}
+
+int ThreadPool::GetNumIdleWorkersForTest() {
+ base::AutoLock autolock(lock_);
+ DCHECK_GE(num_busy_workers_, 0u);
+ DCHECK_LE(num_busy_workers_, workers_.size());
+ return workers_.size() - num_busy_workers_;
+}
+
+int ThreadPool::GetNumZombiesForTest() {
+ base::AutoLock autolock(lock_);
+ return zombies_.size();
+}
+
+// This method is called each time we add a new task to the thread pool.
+void ThreadPool::StartNewWorkerIfNeeded() {
+ lock_.AssertAcquired();
+ DCHECK_GE(num_busy_workers_, 0u);
+ DCHECK_LE(num_busy_workers_, workers_.size());
+ DCHECK_GE(workers_.size(), min_threads_);
+ DCHECK_LE(workers_.size(), max_threads_);
+
+ // We create a new worker to handle the task _unless_ either 1) we're already
+ // at the maximum number of threads, or 2) there are already enough idle
+ // workers sitting around to take on this task (and all other pending tasks
+ // that the idle workers haven't yet had a chance to pick up).
+ if (workers_.size() >= max_threads_ ||
+ task_queue_.size() <= workers_.size() - num_busy_workers_) {
+ return;
+ }
+
+ scoped_ptr<WorkerThread> worker(new WorkerThread(this));
+ if (worker->Start()) {
+ workers_.insert(worker.release());
+ } else {
+ LOG(ERROR) << "Failed to start new worker thread.";
+ }
+}
+
+// static
+void ThreadPool::JoinThreads(const std::set<WorkerThread*>& threads) {
+ for (std::set<WorkerThread*>::const_iterator iter = threads.begin();
+ iter != threads.end(); ++iter) {
+ WorkerThread* thread = *iter;
+ thread->Join();
+ delete thread;
+ }
+}
+
+// Call when the worker thread has been idle for a while. Either return false
+// (worker should continue waiting for tasks), or zombify the worker and return
+// true (worker thread should immediately terminate).
+bool ThreadPool::TryZombifyIdleThread(WorkerThread* thread) {
+ lock_.AssertAcquired();
+
+ // Don't terminate the thread if the thread pool is already at the minimum
+ // number of threads.
+ DCHECK_GE(workers_.size(), min_threads_);
+ if (workers_.size() <= min_threads_) {
+ return false;
+ }
+
+ // Remove this thread from the worker set.
+ DCHECK_EQ(1u, workers_.count(thread));
+ workers_.erase(thread);
+
+ // When a (joinable) thread terminates, it must still be cleaned up, either
+ // by another thread joining it, or by detatching it. However, the thread
+ // pool's not shutting down here, so the master thread doesn't know to join
+ // this thread that we're in now, and the Chromium thread abstraction we're
+ // using doesn't currently allow us to detach a thread. So instead, we place
+ // this WorkerThread object into a "zombie" set, which the master thread can
+ // reap later on. Threads that have terminated but that haven't been joined
+ // yet use up only a small amount of memory (I think), so it's okay if we
+ // don't reap it right away, as long as we don't try to spawn new threads
+ // while there's still lots of zombies.
+ DCHECK(!shutting_down_);
+ DCHECK_EQ(0u, zombies_.count(thread));
+ zombies_.insert(thread);
+ return true;
+}
+
+// Get and return the next task from the queue (which must be non-empty), and
+// update our various counters to indicate that the calling worker is busy
+// executing this task.
+ThreadPool::Task ThreadPool::GetNextTask() {
+ lock_.AssertAcquired();
+
+ // Pop the highest-priority task from the queue. Note that smaller values
+ // correspond to higher priorities (SPDY draft 3 section 2.3.3), so
+ // task_queue_.begin() gets us the highest-priority pending task.
+ DCHECK(!task_queue_.empty());
+ TaskQueue::iterator task_iter = task_queue_.begin();
+ const Task task = task_iter->second;
+ task_queue_.erase(task_iter);
+
+ // Increment the count of active tasks for the executor that owns this
+ // task; we'll decrement it again when the task completes.
+ ++(active_task_counts_[task.owner]);
+
+ // The worker that takes this task will be busy until it completes it.
+ DCHECK_LT(num_busy_workers_, workers_.size());
+ ++num_busy_workers_;
+
+ return task;
+}
+
+// Call to indicate that the task has been completed; update our various
+// counters to indicate that the calling worker is no longer busy executing
+// this task.
+void ThreadPool::OnTaskComplete(Task task) {
+ lock_.AssertAcquired();
+
+ // The worker that just finished this task is no longer busy.
+ DCHECK_GE(num_busy_workers_, 1u);
+ --num_busy_workers_;
+
+ // We've completed the task and reaquired the lock, so decrement the count
+ // of active tasks for this owner.
+ OwnerMap::iterator count_iter = active_task_counts_.find(task.owner);
+ DCHECK(count_iter != active_task_counts_.end());
+ DCHECK(count_iter->second > 0);
+ --(count_iter->second);
+
+ // If this was the last active task for the owner, notify anyone who might be
+ // waiting for the owner to stop.
+ if (count_iter->second == 0) {
+ active_task_counts_.erase(count_iter);
+ task.owner->stopping_condvar_.Broadcast();
+ }
+}
+
+} // namespace mod_spdy
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h Thu May 1 11:39:27 2014
@@ -0,0 +1,145 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_THREAD_POOL_H_
+#define MOD_SPDY_COMMON_THREAD_POOL_H_
+
+#include <map>
+#include <set>
+
+#include "base/basictypes.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/time/time.h"
+#include "net/spdy/spdy_protocol.h" // for net::SpdyPriority
+
+namespace net_instaweb { class Function; }
+
+namespace mod_spdy {
+
+class Executor;
+
+// A ThreadPool keeps a pool of threads waiting to perform tasks. One can
+// create any number of Executor objects, using the NewExecutor method, which
+// will all share the threads for executing tasks. If more tasks are queued
+// than there are threads in the pool, these executors will respect task
+// priorities when deciding which tasks to execute first.
+class ThreadPool {
+ public:
+ // Create a new thread pool that uses at least min_threads threads, and at
+ // most max_threads threads, at a time. min_threads must be no greater than
+ // max_threads, and both must be positive.
+ ThreadPool(int min_threads, int max_threads);
+
+ // As above, but specify the amount of time after which to kill idle threads,
+ // rather than using the default value (this is primarily for testing).
+ // max_thread_idle_time must be non-negative.
+ ThreadPool(int min_threads, int max_threads,
+ base::TimeDelta max_thread_idle_time);
+
+ // The destructor will block until all threads in the pool have shut down.
+ // The ThreadPool must not be destroyed until all Executor objects returned
+ // from the NewExecutor method have first been deleted.
+ ~ThreadPool();
+
+ // Start up the thread pool. Must be called exactly one before using the
+ // thread pool; returns true on success, or false on failure. If startup
+ // fails, the ThreadPool must be immediately deleted.
+ bool Start();
+
+ // Return a new Executor object that uses this thread pool to perform tasks.
+ // The caller gains ownership of the returned Executor, and the ThreadPool
+ // must outlive the returned Executor.
+ Executor* NewExecutor();
+
+ // Return the current total number of worker threads. This is provided for
+ // testing purposes only.
+ int GetNumWorkersForTest();
+ // Return the number of worker threads currently idle. This is provided for
+ // testing purposes only.
+ int GetNumIdleWorkersForTest();
+ // Return the number of terminated (zombie) threads that have yet to be
+ // reaped. This is provided for testing purposes only.
+ int GetNumZombiesForTest();
+
+ private:
+ class ThreadPoolExecutor;
+ class WorkerThread;
+
+ // A Task is a simple pair of the Function to run, and the executor to which
+ // the task was added.
+ struct Task {
+ Task(net_instaweb::Function* fun, ThreadPoolExecutor* own)
+ : function(fun), owner(own) {}
+ net_instaweb::Function* function;
+ ThreadPoolExecutor* owner;
+ };
+
+ typedef std::multimap<net::SpdyPriority, Task> TaskQueue;
+ typedef std::map<const ThreadPoolExecutor*, int> OwnerMap;
+
+ // Start a new worker thread if 1) the task queue is larger than the number
+ // of currently idle workers, and 2) we have fewer than the maximum number of
+ // workers. Otherwise, do nothing. Must be holding lock_ when calling this.
+ void StartNewWorkerIfNeeded();
+
+ // Join and delete all worker threads in the given set. This will block
+ // until all the threads have terminated and been cleaned up, so don't call
+ // this while holding the lock_.
+ static void JoinThreads(const std::set<WorkerThread*>& threads);
+
+ // These calls are used to implement the WorkerThread's main function. Must
+ // be holding lock_ when calling any of these.
+ bool TryZombifyIdleThread(WorkerThread* thread);
+ Task GetNextTask();
+ void OnTaskComplete(Task task);
+
+ // The min and max number of threads passed to the constructor. Although the
+ // constructor takes signed ints (for convenience), we store these unsigned
+ // to avoid the need for static_casts when comparing against workers_.size().
+ const unsigned int min_threads_;
+ const unsigned int max_threads_;
+ const base::TimeDelta max_thread_idle_time_;
+ // This single master lock protects all of the below fields, as well as any
+ // mutable data and condition variables in the worker threads and executors.
+ // Having just one lock makes everything much easier to understand.
+ base::Lock lock_;
+ // Workers wait on this condvar when waiting for a new task. We signal it
+ // when a new task becomes available, or when we need to shut down.
+ base::ConditionVariable worker_condvar_;
+ // The list of running worker threads. We keep this around so that we can
+ // join the threads on shutdown.
+ std::set<WorkerThread*> workers_;
+ // Worker threads that have shut themselves down (due to being idle), and are
+ // awaiting cleanup by the master thread.
+ std::set<WorkerThread*> zombies_;
+ // How many workers do we have that are actually executing tasks?
+ unsigned int num_busy_workers_;
+ // We set this to true to tell the worker threads to terminate.
+ bool shutting_down_;
+ // The priority queue of pending tasks. Invariant: all Function objects in
+ // the queue have neither been started nor cancelled yet.
+ TaskQueue task_queue_;
+ // This maps executors to the number of currently running tasks for that
+ // executor; we increment when we start a task, and decrement when we finish
+ // it. If the number is zero, we remove the entry from the map; thus, as an
+ // invariant the map only contains entries for executors with active tasks.
+ OwnerMap active_task_counts_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPool);
+};
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_COMMON_THREAD_POOL_H_
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool_test.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool_test.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/thread_pool_test.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,339 @@
+// Copyright 2012 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/thread_pool.h"
+
+#include <vector>
+
+#include "base/basictypes.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/threading/platform_thread.h"
+#include "base/time/time.h"
+#include "mod_spdy/common/executor.h"
+#include "mod_spdy/common/testing/notification.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+// When adding tests here, try to keep them robust against thread scheduling
+// differences from run to run. In particular, they shouldn't fail just
+// because you're running under Valgrind.
+
+namespace {
+
+// When run, a TestFunction waits for `wait` millis, then sets `*result` to
+// RAN. When cancelled, it sets *result to CANCELLED.
+class TestFunction : public net_instaweb::Function {
+ public:
+ enum Result { NOTHING, RAN, CANCELLED };
+ TestFunction(int wait, base::Lock* lock, Result* result)
+ : wait_(wait), lock_(lock), result_(result) {}
+ virtual ~TestFunction() {}
+ protected:
+ // net_instaweb::Function methods:
+ virtual void Run() {
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(wait_));
+ base::AutoLock autolock(*lock_);
+ *result_ = RAN;
+ }
+ virtual void Cancel() {
+ base::AutoLock autolock(*lock_);
+ *result_ = CANCELLED;
+ }
+ private:
+ const int wait_;
+ base::Lock* const lock_;
+ Result* const result_;
+ DISALLOW_COPY_AND_ASSIGN(TestFunction);
+};
+
+// Test that we execute tasks concurrently, that that we respect priorities
+// when pulling tasks from the queue.
+TEST(ThreadPoolTest, ConcurrencyAndPrioritization) {
+ // Create a thread pool with 2 threads, and an executor.
+ mod_spdy::ThreadPool thread_pool(2, 2);
+ ASSERT_TRUE(thread_pool.Start());
+ scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+ base::Lock lock;
+ TestFunction::Result result0 = TestFunction::NOTHING;
+ TestFunction::Result result1 = TestFunction::NOTHING;
+ TestFunction::Result result2 = TestFunction::NOTHING;
+ TestFunction::Result result3 = TestFunction::NOTHING;
+
+ // Create a high-priority TestFunction, which waits for 200 millis then
+ // records that it ran.
+ executor->AddTask(new TestFunction(200, &lock, &result0), 0);
+ // Create several TestFunctions at different priorities. Each waits 100
+ // millis then records that it ran.
+ executor->AddTask(new TestFunction(100, &lock, &result1), 1);
+ executor->AddTask(new TestFunction(100, &lock, &result3), 3);
+ executor->AddTask(new TestFunction(100, &lock, &result2), 2);
+
+ // Wait 150 millis, then stop the executor.
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(150));
+ executor->Stop();
+
+ // Only TestFunctions that _started_ within the first 150 millis should have
+ // run; the others should have been cancelled.
+ // - The priority-0 function should have started first, on the first
+ // thread. It finishes after 200 millis.
+ // - The priority-1 function should run on the second thread. It finishes
+ // after 100 millis.
+ // - The priority-2 function should run on the second thread after the
+ // priority-1 function finishes, even though it was pushed last, because
+ // it's higher-priority than the priority-3 function. It finishes at the
+ // 200-milli mark.
+ // - The priority-3 function should not get a chance to run, because we
+ // stop the executor after 150 millis, and the soonest it could start is
+ // the 200-milli mark.
+ base::AutoLock autolock(lock);
+ EXPECT_EQ(TestFunction::RAN, result0);
+ EXPECT_EQ(TestFunction::RAN, result1);
+ EXPECT_EQ(TestFunction::RAN, result2);
+ EXPECT_EQ(TestFunction::CANCELLED, result3);
+}
+
+// Test that stopping one executor doesn't affect tasks on another executor
+// from the same ThreadPool.
+TEST(ThreadPoolTest, MultipleExecutors) {
+ // Create a thread pool with 3 threads, and two executors.
+ mod_spdy::ThreadPool thread_pool(3, 3);
+ ASSERT_TRUE(thread_pool.Start());
+ scoped_ptr<mod_spdy::Executor> executor1(thread_pool.NewExecutor());
+ scoped_ptr<mod_spdy::Executor> executor2(thread_pool.NewExecutor());
+
+ base::Lock lock;
+ TestFunction::Result e1r1 = TestFunction::NOTHING;
+ TestFunction::Result e1r2 = TestFunction::NOTHING;
+ TestFunction::Result e1r3 = TestFunction::NOTHING;
+ TestFunction::Result e2r1 = TestFunction::NOTHING;
+ TestFunction::Result e2r2 = TestFunction::NOTHING;
+ TestFunction::Result e2r3 = TestFunction::NOTHING;
+
+ // Add some tasks to the executors. Each one takes 50 millis to run.
+ executor1->AddTask(new TestFunction(50, &lock, &e1r1), 0);
+ executor2->AddTask(new TestFunction(50, &lock, &e2r1), 0);
+ executor1->AddTask(new TestFunction(50, &lock, &e1r2), 0);
+ executor2->AddTask(new TestFunction(50, &lock, &e2r2), 1);
+ executor1->AddTask(new TestFunction(50, &lock, &e1r3), 3);
+ executor2->AddTask(new TestFunction(50, &lock, &e2r3), 1);
+
+ // Wait 20 millis (to make sure the first few tasks got picked up), then
+ // destroy executor2, which should stop it. Finally, sleep another 100
+ // millis to give the remaining tasks a chance to finish.
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(20));
+ executor2.reset();
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(100));
+
+ // The three high priority tasks should have all run. The other two tasks on
+ // executor2 should have been cancelled when we stopped executor2, but the
+ // low-priority task on executor1 should have been left untouched, and
+ // allowed to finish.
+ base::AutoLock autolock(lock);
+ EXPECT_EQ(TestFunction::RAN, e1r1);
+ EXPECT_EQ(TestFunction::RAN, e2r1);
+ EXPECT_EQ(TestFunction::RAN, e1r2);
+ EXPECT_EQ(TestFunction::CANCELLED, e2r2);
+ EXPECT_EQ(TestFunction::RAN, e1r3);
+ EXPECT_EQ(TestFunction::CANCELLED, e2r3);
+}
+
+// When run, a WaitFunction blocks until the notification is set.
+class WaitFunction : public net_instaweb::Function {
+ public:
+ WaitFunction(mod_spdy::testing::Notification* notification)
+ : notification_(notification) {}
+ virtual ~WaitFunction() {}
+ protected:
+ // net_instaweb::Function methods:
+ virtual void Run() {
+ notification_->Wait();
+ }
+ virtual void Cancel() {}
+ private:
+ mod_spdy::testing::Notification* const notification_;
+ DISALLOW_COPY_AND_ASSIGN(WaitFunction);
+};
+
+// When run, an IdFunction pushes its ID onto the vector.
+class IdFunction : public net_instaweb::Function {
+ public:
+ IdFunction(int id, base::Lock* lock, base::ConditionVariable* condvar,
+ std::vector<int>* output)
+ : id_(id), lock_(lock), condvar_(condvar), output_(output) {}
+ virtual ~IdFunction() {}
+ protected:
+ // net_instaweb::Function methods:
+ virtual void Run() {
+ base::AutoLock autolock(*lock_);
+ output_->push_back(id_);
+ condvar_->Broadcast();
+ }
+ virtual void Cancel() {}
+ private:
+ const int id_;
+ base::Lock* const lock_;
+ base::ConditionVariable* const condvar_;
+ std::vector<int>* const output_;
+ DISALLOW_COPY_AND_ASSIGN(IdFunction);
+};
+
+// Test that if many tasks of the same priority are added, they are run in the
+// order they were added.
+TEST(ThreadPoolTest, SamePriorityTasksAreFIFO) {
+ // Create a thread pool with just one thread, and an executor.
+ mod_spdy::ThreadPool thread_pool(1, 1);
+ ASSERT_TRUE(thread_pool.Start());
+ scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+ // First, make sure no other tasks will get started until we set the
+ // notification.
+ mod_spdy::testing::Notification start;
+ executor->AddTask(new WaitFunction(&start), 0);
+
+ // Add many tasks to the executor, of varying priorities.
+ const int num_tasks_each_priority = 1000;
+ const int total_num_tasks = 3 * num_tasks_each_priority;
+ base::Lock lock;
+ base::ConditionVariable condvar(&lock);
+ std::vector<int> ids; // protected by lock
+ for (int id = 0; id < num_tasks_each_priority; ++id) {
+ executor->AddTask(new IdFunction(id, &lock, &condvar, &ids), 1);
+ executor->AddTask(new IdFunction(id + num_tasks_each_priority,
+ &lock, &condvar, &ids), 2);
+ executor->AddTask(new IdFunction(id + 2 * num_tasks_each_priority,
+ &lock, &condvar, &ids), 3);
+ }
+
+ // Start us off, then wait for all tasks to finish.
+ start.Set();
+ base::AutoLock autolock(lock);
+ while (static_cast<int>(ids.size()) < total_num_tasks) {
+ condvar.Wait();
+ }
+
+ // Check that the tasks were executed in order by the one worker thread.
+ for (int index = 0; index < total_num_tasks; ++index) {
+ ASSERT_EQ(index, ids[index])
+ << "Task " << ids[index] << " finished in position " << index;
+ }
+}
+
+// Add a test failure if the thread pool does not stabilize to the expected
+// total/idle number of worker threads withing the given timeout.
+void ExpectWorkersWithinTimeout(int expected_num_workers,
+ int expected_num_idle_workers,
+ mod_spdy::ThreadPool* thread_pool,
+ int timeout_millis) {
+ int millis_remaining = timeout_millis;
+ while (true) {
+ const int actual_num_workers = thread_pool->GetNumWorkersForTest();
+ const int actual_num_idle_workers =
+ thread_pool->GetNumIdleWorkersForTest();
+ if (actual_num_workers == expected_num_workers &&
+ actual_num_idle_workers == expected_num_idle_workers) {
+ return;
+ }
+ if (millis_remaining <= 0) {
+ ADD_FAILURE() << "Timed out; expected " << expected_num_workers
+ << " worker(s) with " << expected_num_idle_workers
+ <<" idle; still at " << actual_num_workers
+ << " worker(s) with " << actual_num_idle_workers
+ << " idle after " << timeout_millis << "ms";
+ return;
+ }
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(10));
+ millis_remaining -= 10;
+ }
+}
+
+// Test that we spawn new threads as needed, and allow them to die off after
+// being idle for a while.
+TEST(ThreadPoolTest, CreateAndRetireWorkers) {
+ // Create a thread pool with min_threads < max_threads, and give it a short
+ // max_thread_idle_time.
+ const int idle_time_millis = 100;
+ mod_spdy::ThreadPool thread_pool(
+ 2, 4, base::TimeDelta::FromMilliseconds(idle_time_millis));
+ ASSERT_TRUE(thread_pool.Start());
+ // As soon as we start the thread pool, there should be the minimum number of
+ // workers (two), both counted as idle.
+ EXPECT_EQ(2, thread_pool.GetNumWorkersForTest());
+ EXPECT_EQ(2, thread_pool.GetNumIdleWorkersForTest());
+
+ scoped_ptr<mod_spdy::Executor> executor(thread_pool.NewExecutor());
+
+ // Start up three tasks. That should push us up to three workers
+ // immediately. If we make sure to give those threads a chance to run, they
+ // should soon pick up the tasks and all be busy.
+ mod_spdy::testing::Notification done1;
+ executor->AddTask(new WaitFunction(&done1), 0);
+ executor->AddTask(new WaitFunction(&done1), 1);
+ executor->AddTask(new WaitFunction(&done1), 2);
+ EXPECT_EQ(3, thread_pool.GetNumWorkersForTest());
+ ExpectWorkersWithinTimeout(3, 0, &thread_pool, 100);
+
+ // Add three more tasks. We should now be at the maximum number of workers,
+ // and that fourth worker should be busy soon.
+ mod_spdy::testing::Notification done2;
+ executor->AddTask(new WaitFunction(&done2), 1);
+ executor->AddTask(new WaitFunction(&done2), 2);
+ executor->AddTask(new WaitFunction(&done2), 3);
+ EXPECT_EQ(4, thread_pool.GetNumWorkersForTest());
+ ExpectWorkersWithinTimeout(4, 0, &thread_pool, 100);
+
+ // Allow the first group of tasks to finish. There are now only three tasks
+ // running, so one of our four threads should go idle. If we wait for a
+ // while after that, that thread should terminate and enter zombie mode.
+ done1.Set();
+ ExpectWorkersWithinTimeout(4, 1, &thread_pool, idle_time_millis / 2);
+ ExpectWorkersWithinTimeout(3, 0, &thread_pool, 2 * idle_time_millis);
+ EXPECT_EQ(1, thread_pool.GetNumZombiesForTest());
+
+ // Allow the second group of tasks to finish. There are no tasks left, so
+ // all three threads should go idle. If we wait for a while after that,
+ // exactly one of the three should shut down, bringing us back down to the
+ // minimum number of threads. We should now have two zombie threads.
+ done2.Set();
+ ExpectWorkersWithinTimeout(3, 3, &thread_pool, idle_time_millis / 2);
+ ExpectWorkersWithinTimeout(2, 2, &thread_pool, 2 * idle_time_millis);
+ EXPECT_EQ(2, thread_pool.GetNumZombiesForTest());
+
+ // Start some new new tasks. This should cause us to immediately reap the
+ // zombie threads, and soon, we should have three busy threads.
+ mod_spdy::testing::Notification done3;
+ executor->AddTask(new WaitFunction(&done3), 0);
+ executor->AddTask(new WaitFunction(&done3), 2);
+ executor->AddTask(new WaitFunction(&done3), 1);
+ EXPECT_EQ(0, thread_pool.GetNumZombiesForTest());
+ EXPECT_EQ(3, thread_pool.GetNumWorkersForTest());
+ ExpectWorkersWithinTimeout(3, 0, &thread_pool, 100);
+
+ // Let those tasks finish. Once again, the threads should go idle, and then
+ // one of them should terminate and enter zombie mode.
+ done3.Set();
+ ExpectWorkersWithinTimeout(3, 3, &thread_pool, idle_time_millis / 2);
+ ExpectWorkersWithinTimeout(2, 2, &thread_pool, 2 * idle_time_millis);
+ EXPECT_EQ(1, thread_pool.GetNumZombiesForTest());
+
+ // When we exit the test, the thread pool's destructor should reap the zombie
+ // thread (as well as shutting down the still-running workers). We can
+ // verify this by running this test under valgrind and making sure that no
+ // memory is leaked.
+}
+
+} // namespace
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in Thu May 1 11:39:27 2014
@@ -0,0 +1,25 @@
+// Copyright (c) 2010 The Chromium Authors. All rights reserved.
+// Use of this source is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// version.h is generated from version.h.in. Edit the source!
+
+#pragma once
+
+// Version Information
+
+#define MOD_SPDY_VERSION @MAJOR@,@MINOR@,@BUILD@,@PATCH@
+#define MOD_SPDY_VERSION_STRING "@MAJOR@.@MINOR@.@BUILD@.@PATCH@"
+
+// Branding Information
+
+#define COMPANY_FULLNAME_STRING "@COMPANY_FULLNAME@"
+#define COMPANY_SHORTNAME_STRING "@COMPANY_SHORTNAME@"
+#define PRODUCT_FULLNAME_STRING "@PRODUCT_FULLNAME@"
+#define PRODUCT_SHORTNAME_STRING "@PRODUCT_SHORTNAME@"
+#define COPYRIGHT_STRING "@COPYRIGHT@"
+#define OFFICIAL_BUILD_STRING "@OFFICIAL_BUILD@"
+
+// Changelist Information
+
+#define LASTCHANGE_STRING "@LASTCHANGE@"
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/version.h.in
------------------------------------------------------------------------------
svn:eol-style = native