You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ji...@apache.org on 2014/05/01 13:39:32 UTC

svn commit: r1591620 [7/14] - in /httpd/mod_spdy/branches/httpd-2.2.x: ./ base/ base/metrics/ build/ install/ install/common/ install/debian/ install/rpm/ mod_spdy/ mod_spdy/apache/ mod_spdy/apache/filters/ mod_spdy/apache/testing/ mod_spdy/common/ mod...

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,120 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+
+#include <list>
+#include <map>
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/time/time.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace mod_spdy {
+
+SpdyFramePriorityQueue::SpdyFramePriorityQueue()
+    : condvar_(&lock_) {}
+
+SpdyFramePriorityQueue::~SpdyFramePriorityQueue() {
+  for (QueueMap::iterator iter = queue_map_.begin();
+       iter != queue_map_.end(); ++iter) {
+    FrameList* list = iter->second;
+    STLDeleteContainerPointers(list->begin(), list->end());
+    delete list;
+  }
+}
+
+bool SpdyFramePriorityQueue::IsEmpty() const {
+  base::AutoLock autolock(lock_);
+  return queue_map_.empty();
+}
+
+const int SpdyFramePriorityQueue::kTopPriority = -1;
+
+void SpdyFramePriorityQueue::Insert(int priority, net::SpdyFrameIR* frame) {
+  base::AutoLock autolock(lock_);
+  DCHECK(frame);
+
+  // Get the frame list for the given priority; if it doesn't currently exist,
+  // create it in the map.
+  FrameList* list = NULL;
+  QueueMap::iterator iter = queue_map_.find(priority);
+  if (iter == queue_map_.end()) {
+    list = new FrameList;
+    queue_map_[priority] = list;
+  } else {
+    list = iter->second;
+  }
+  DCHECK(list);
+
+  // Add the frame to the end of the list, and wake up at most one thread
+  // sleeping on a BlockingPop.
+  list->push_back(frame);
+  condvar_.Signal();
+}
+
+bool SpdyFramePriorityQueue::Pop(net::SpdyFrameIR** frame) {
+  base::AutoLock autolock(lock_);
+  return InternalPop(frame);
+}
+
+bool SpdyFramePriorityQueue::BlockingPop(const base::TimeDelta& max_time,
+                                         net::SpdyFrameIR** frame) {
+  base::AutoLock autolock(lock_);
+  DCHECK(frame);
+
+  const base::TimeDelta zero = base::TimeDelta();
+  base::TimeDelta time_remaining = max_time;
+  while (time_remaining > zero && queue_map_.empty()) {
+    // TODO(mdsteele): It appears from looking at the Chromium source code that
+    // HighResNow() is "expensive" on Windows (how expensive, I am not sure);
+    // however, the other options for getting a "now" time either don't
+    // guarantee monotonicity (so time might go backwards) or might be too
+    // low-resolution for our purposes, so I think we'd better stick with this
+    // for now.  But is there a better way to do what we're doing here?
+    const base::TimeTicks start = base::TimeTicks::HighResNow();
+    condvar_.TimedWait(time_remaining);
+    time_remaining -= base::TimeTicks::HighResNow() - start;
+  }
+
+  return InternalPop(frame);
+}
+
+bool SpdyFramePriorityQueue::InternalPop(net::SpdyFrameIR** frame) {
+  lock_.AssertAcquired();
+  DCHECK(frame);
+  if (queue_map_.empty()) {
+    return false;
+  }
+  // As an invariant, the lists in the queue map are never empty.  So get the
+  // list of highest priority (smallest priority number) and pop the first
+  // frame from it.
+  QueueMap::iterator iter = queue_map_.begin();
+  FrameList* list = iter->second;
+  DCHECK(!list->empty());
+  *frame = list->front();
+  list->pop_front();
+  // If the list is now empty, we have to delete it from the map to maintain
+  // the invariant.
+  if (list->empty()) {
+    queue_map_.erase(iter);
+    delete list;
+  }
+  return true;
+}
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h Thu May  1 11:39:27 2014
@@ -0,0 +1,95 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_FRAME_PRIORITY_QUEUE_H_
+#define MOD_SPDY_COMMON_SPDY_FRAME_PRIORITY_QUEUE_H_
+
+#include <list>
+#include <map>
+
+#include "base/basictypes.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+
+namespace base { class TimeDelta; }
+
+namespace net { class SpdyFrameIR; }
+
+namespace mod_spdy {
+
+// A priority queue of SPDY frames, intended for multiplexing output frames
+// from multiple SPDY stream threads back to the SPDY connection thread and
+// allowing frames from high-priority streams to cut in front of lower-priority
+// streams.  This class is thread-safe -- its methods may be called
+// concurrently by multiple threads.
+class SpdyFramePriorityQueue {
+ public:
+  // Create an initially-empty queue.
+  SpdyFramePriorityQueue();
+  ~SpdyFramePriorityQueue();
+
+  // Return true if the queue is currently empty.  (Of course, there's no
+  // guarantee that another thread won't change that as soon as this method
+  // returns.)
+  bool IsEmpty() const;
+
+  // A priority value that is more important than any priority normally used
+  // for sending SPDY frames.
+  static const int kTopPriority;
+
+  // Insert a frame into the queue at the specified priority.  The queue takes
+  // ownership of the frame, and will delete it if the queue is deleted before
+  // the frame is removed from the queue by the Pop method.  Note that smaller
+  // numbers indicate higher priorities.
+  void Insert(int priority, net::SpdyFrameIR* frame);
+
+  // Remove and provide a frame from the queue and return true, or return false
+  // if the queue is empty.  The caller gains ownership of the provided frame
+  // object.  This method will try to yield higher-priority frames before
+  // lower-priority ones (even if they were inserted later), but guarantees to
+  // return same-priority frames in the same order they were inserted (FIFO).
+  // In particular, this means that a sequence of frames from the same SPDY
+  // stream will stay in order (assuming they were all inserted with the same
+  // priority -- that of the stream).
+  bool Pop(net::SpdyFrameIR** frame);
+
+  // Like Pop(), but if the queue is empty this method will block for up to
+  // max_time before returning false.
+  bool BlockingPop(const base::TimeDelta& max_time, net::SpdyFrameIR** frame);
+
+ private:
+  // Same as Pop(), but requires lock_ to be held.
+  bool InternalPop(net::SpdyFrameIR** frame);
+
+  mutable base::Lock lock_;
+  base::ConditionVariable condvar_;
+  // We use a map of lists to store frames, to guarantee that frames of the
+  // same priority are stored in FIFO order.  A simpler implementation would be
+  // to just use a multimap, which in practice is nearly always implemented
+  // with the FIFO behavior that we want, but the spec doesn't actually
+  // guarantee that behavior.
+  //
+  // Each list stores frames of a particular priority.  Invariant: the lists in
+  // the QueueMap are never empty; if one of the lists becomes empty, that
+  // key/value pair is immediately removed from the map.
+  typedef std::list<net::SpdyFrameIR*> FrameList;
+  typedef std::map<int, FrameList*> QueueMap;
+  QueueMap queue_map_;
+
+  DISALLOW_COPY_AND_ASSIGN(SpdyFramePriorityQueue);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_COMMON_SPDY_FRAME_PRIORITY_QUEUE_H_

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue_test.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue_test.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue_test.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,139 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+
+#include "base/time/time.h"
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace {
+
+void ExpectPop(net::SpdyPingId expected,
+               mod_spdy::SpdyFramePriorityQueue* queue) {
+  EXPECT_FALSE(queue->IsEmpty());
+  net::SpdyFrameIR* raw_frame = NULL;
+  const bool success = queue->Pop(&raw_frame);
+  scoped_ptr<net::SpdyFrameIR> scoped_frame(raw_frame);
+  EXPECT_TRUE(success);
+  ASSERT_TRUE(scoped_frame != NULL);
+  EXPECT_THAT(*scoped_frame, mod_spdy::testing::IsPing(expected));
+}
+
+void ExpectEmpty(mod_spdy::SpdyFramePriorityQueue* queue) {
+  EXPECT_TRUE(queue->IsEmpty());
+  net::SpdyFrameIR* frame = NULL;
+  EXPECT_FALSE(queue->Pop(&frame));
+  EXPECT_TRUE(frame == NULL);
+}
+
+TEST(SpdyFramePriorityQueueTest, InsertSpdy2) {
+  net::SpdyFramer framer(net::SPDY2);
+  mod_spdy::SpdyFramePriorityQueue queue;
+  ExpectEmpty(&queue);
+
+  EXPECT_EQ(3u, framer.GetLowestPriority());
+  EXPECT_EQ(0u, framer.GetHighestPriority());
+
+  queue.Insert(3, new net::SpdyPingIR(4));
+  queue.Insert(0, new net::SpdyPingIR(1));
+  queue.Insert(3, new net::SpdyPingIR(3));
+
+  ExpectPop(1, &queue);
+  ExpectPop(4, &queue);
+
+  queue.Insert(2, new net::SpdyPingIR(2));
+  queue.Insert(1, new net::SpdyPingIR(6));
+  queue.Insert(1, new net::SpdyPingIR(5));
+
+  ExpectPop(6, &queue);
+  ExpectPop(5, &queue);
+  ExpectPop(2, &queue);
+  ExpectPop(3, &queue);
+  ExpectEmpty(&queue);
+}
+
+TEST(SpdyFramePriorityQueueTest, InsertSpdy3) {
+  net::SpdyFramer framer(net::SPDY3);
+  mod_spdy::SpdyFramePriorityQueue queue;
+  ExpectEmpty(&queue);
+
+  EXPECT_EQ(7u, framer.GetLowestPriority());
+  EXPECT_EQ(0u, framer.GetHighestPriority());
+
+  queue.Insert(7, new net::SpdyPingIR(4));
+  queue.Insert(0, new net::SpdyPingIR(1));
+  queue.Insert(7, new net::SpdyPingIR(3));
+
+  ExpectPop(1, &queue);
+  ExpectPop(4, &queue);
+
+  queue.Insert(6, new net::SpdyPingIR(2));
+  queue.Insert(1, new net::SpdyPingIR(6));
+  queue.Insert(5, new net::SpdyPingIR(5));
+
+  ExpectPop(6, &queue);
+  ExpectPop(5, &queue);
+  ExpectPop(2, &queue);
+  ExpectPop(3, &queue);
+  ExpectEmpty(&queue);
+}
+
+TEST(SpdyFramePriorityQueueTest, InsertTopPriority) {
+  mod_spdy::SpdyFramePriorityQueue queue;
+  ExpectEmpty(&queue);
+
+  queue.Insert(3, new net::SpdyPingIR(4));
+  queue.Insert(mod_spdy::SpdyFramePriorityQueue::kTopPriority,
+               new net::SpdyPingIR(2));
+  queue.Insert(mod_spdy::SpdyFramePriorityQueue::kTopPriority,
+               new net::SpdyPingIR(6));
+  queue.Insert(0, new net::SpdyPingIR(1));
+  queue.Insert(3, new net::SpdyPingIR(3));
+
+  ExpectPop(2, &queue);
+  ExpectPop(6, &queue);
+  ExpectPop(1, &queue);
+  ExpectPop(4, &queue);
+
+  queue.Insert(mod_spdy::SpdyFramePriorityQueue::kTopPriority,
+               new net::SpdyPingIR(5));
+
+  ExpectPop(5, &queue);
+  ExpectPop(3, &queue);
+  ExpectEmpty(&queue);
+}
+
+TEST(SpdyFramePriorityQueueTest, BlockingPop) {
+  mod_spdy::SpdyFramePriorityQueue queue;
+  net::SpdyFrameIR* frame;
+  ASSERT_FALSE(queue.Pop(&frame));
+
+  const base::TimeDelta time_to_wait = base::TimeDelta::FromMilliseconds(50);
+  const base::TimeTicks start = base::TimeTicks::HighResNow();
+  ASSERT_FALSE(queue.BlockingPop(time_to_wait, &frame));
+  const base::TimeDelta actual_time_waited =
+      base::TimeTicks::HighResNow() - start;
+
+  // Check that we waited at least as long as we asked for.
+  EXPECT_GE(actual_time_waited, time_to_wait);
+  // Check that we didn't wait too much longer than we asked for.
+  EXPECT_LT(actual_time_waited.InMillisecondsF(),
+            1.1 * time_to_wait.InMillisecondsF());
+}
+
+}  // namespace

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,84 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_frame_queue.h"
+
+#include <list>
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace mod_spdy {
+
+SpdyFrameQueue::SpdyFrameQueue()
+    : condvar_(&lock_), is_aborted_(false) {}
+
+SpdyFrameQueue::~SpdyFrameQueue() {
+  STLDeleteContainerPointers(queue_.begin(), queue_.end());
+}
+
+bool SpdyFrameQueue::is_aborted() const {
+  base::AutoLock autolock(lock_);
+  return is_aborted_;
+}
+
+void SpdyFrameQueue::Abort() {
+  base::AutoLock autolock(lock_);
+  is_aborted_ = true;
+  STLDeleteContainerPointers(queue_.begin(), queue_.end());
+  queue_.clear();
+  condvar_.Broadcast();
+}
+
+void SpdyFrameQueue::Insert(net::SpdyFrameIR* frame) {
+  base::AutoLock autolock(lock_);
+  DCHECK(frame);
+
+  if (is_aborted_) {
+    DCHECK(queue_.empty());
+    delete frame;
+  } else {
+    if (queue_.empty()) {
+      condvar_.Signal();
+    }
+    queue_.push_front(frame);
+  }
+}
+
+bool SpdyFrameQueue::Pop(bool block, net::SpdyFrameIR** frame) {
+  base::AutoLock autolock(lock_);
+  DCHECK(frame);
+
+  if (block) {
+    // Block until the queue is nonempty or we abort.
+    while (queue_.empty() && !is_aborted_) {
+      condvar_.Wait();
+    }
+  }
+
+  // If we've aborted, the queue should now be empty.
+  DCHECK(!is_aborted_ || queue_.empty());
+  if (queue_.empty()) {
+    return false;
+  }
+
+  *frame = queue_.back();
+  queue_.pop_back();
+  return true;
+}
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h Thu May  1 11:39:27 2014
@@ -0,0 +1,71 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_FRAME_QUEUE_H_
+#define MOD_SPDY_COMMON_SPDY_FRAME_QUEUE_H_
+
+#include <list>
+
+#include "base/basictypes.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+
+namespace net { class SpdyFrameIR; }
+
+namespace mod_spdy {
+
+// A simple FIFO queue of SPDY frames, intended for sending input frames from
+// the SPDY connection thread to a SPDY stream thread.  This class is
+// thread-safe -- all methods may be called concurrently by multiple threads.
+class SpdyFrameQueue {
+ public:
+  // Create an initially-empty queue.
+  SpdyFrameQueue();
+  ~SpdyFrameQueue();
+
+  // Return true if this queue has been aborted.
+  bool is_aborted() const;
+
+  // Abort the queue.  All frames held by the queue will be deleted; future
+  // frames passed to Insert() will be immediately deleted; future calls to
+  // Pop() will fail immediately; and current blocking calls to Pop will
+  // immediately unblock and fail.
+  void Abort();
+
+  // Insert a frame into the queue.  The queue takes ownership of the frame,
+  // and will delete it if the queue is deleted or aborted before the frame is
+  // removed from the queue by the Pop method.
+  void Insert(net::SpdyFrameIR* frame);
+
+  // Remove and provide a frame from the queue and return true, or return false
+  // if the queue is empty or has been aborted.  If the block argument is true,
+  // block until a frame becomes available (or the queue is aborted).  The
+  // caller gains ownership of the provided frame object.
+  bool Pop(bool block, net::SpdyFrameIR** frame);
+
+ private:
+  // This is a pretty naive implementation of a thread-safe queue, but it's
+  // good enough for our purposes.  We could use an apr_queue_t instead of
+  // rolling our own class, but it lacks the ownership semantics that we want.
+  mutable base::Lock lock_;
+  base::ConditionVariable condvar_;
+  std::list<net::SpdyFrameIR*> queue_;
+  bool is_aborted_;
+
+  DISALLOW_COPY_AND_ASSIGN(SpdyFrameQueue);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_COMMON_SPDY_FRAME_QUEUE_H_

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue_test.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue_test.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue_test.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,112 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_frame_queue.h"
+
+#include "base/basictypes.h"
+#include "base/threading/platform_thread.h"
+#include "mod_spdy/common/testing/async_task_runner.h"
+#include "mod_spdy/common/testing/notification.h"
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace {
+
+const int kSpdyVersion = 2;
+
+void ExpectPop(bool block, net::SpdyStreamId expected,
+               mod_spdy::SpdyFrameQueue* queue) {
+  net::SpdyFrameIR* raw_frame = NULL;
+  const bool success = queue->Pop(block, &raw_frame);
+  scoped_ptr<net::SpdyFrameIR> scoped_frame(raw_frame);
+  EXPECT_TRUE(success);
+  ASSERT_TRUE(scoped_frame != NULL);
+  EXPECT_THAT(*scoped_frame, mod_spdy::testing::IsPing(expected));
+}
+
+void ExpectEmpty(mod_spdy::SpdyFrameQueue* queue) {
+  net::SpdyFrameIR* frame = NULL;
+  EXPECT_FALSE(queue->Pop(false, &frame));
+  EXPECT_TRUE(frame == NULL);
+}
+
+TEST(SpdyFrameQueueTest, Simple) {
+  mod_spdy::SpdyFrameQueue queue;
+  ExpectEmpty(&queue);
+
+  queue.Insert(new net::SpdyPingIR(4));
+  queue.Insert(new net::SpdyPingIR(1));
+  queue.Insert(new net::SpdyPingIR(3));
+
+  ExpectPop(false, 4, &queue);
+  ExpectPop(false, 1, &queue);
+
+  queue.Insert(new net::SpdyPingIR(2));
+  queue.Insert(new net::SpdyPingIR(5));
+
+  ExpectPop(false, 3, &queue);
+  ExpectPop(false, 2, &queue);
+  ExpectPop(false, 5, &queue);
+  ExpectEmpty(&queue);
+}
+
+TEST(SpdyFrameQueueTest, AbortEmptiesQueue) {
+  mod_spdy::SpdyFrameQueue queue;
+  ASSERT_FALSE(queue.is_aborted());
+  ExpectEmpty(&queue);
+
+  queue.Insert(new net::SpdyPingIR(4));
+  queue.Insert(new net::SpdyPingIR(1));
+  queue.Insert(new net::SpdyPingIR(3));
+
+  ExpectPop(false, 4, &queue);
+
+  queue.Abort();
+
+  ExpectEmpty(&queue);
+  ASSERT_TRUE(queue.is_aborted());
+}
+
+class BlockingPopTask : public mod_spdy::testing::AsyncTaskRunner::Task {
+ public:
+  explicit BlockingPopTask(mod_spdy::SpdyFrameQueue* queue) : queue_(queue) {}
+  virtual void Run() { ExpectPop(true, 7, queue_); }
+ private:
+  mod_spdy::SpdyFrameQueue* const queue_;
+  DISALLOW_COPY_AND_ASSIGN(BlockingPopTask);
+};
+
+TEST(SpdyFrameQueueTest, BlockingPop) {
+  mod_spdy::SpdyFrameQueue queue;
+
+  // Start a task that will do a blocking pop from the queue.
+  mod_spdy::testing::AsyncTaskRunner runner(new BlockingPopTask(&queue));
+  ASSERT_TRUE(runner.Start());
+
+  // Even if we wait for a little bit, the task shouldn't complete, because
+  // that thread is blocked, because the queue is still empty.
+  base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(50));
+  runner.notification()->ExpectNotSet();
+  ExpectEmpty(&queue);
+
+  // Now, if we push something into the queue, the task should soon unblock and
+  // complete, and the queue should then be empty.
+  queue.Insert(new net::SpdyPingIR(7));
+  runner.notification()->ExpectSetWithinMillis(100);
+  ExpectEmpty(&queue);
+}
+
+}  // namespace

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,75 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_server_config.h"
+
+#include "mod_spdy/common/protocol_util.h"
+
+namespace {
+
+const bool kDefaultSpdyEnabled = false;
+const int kDefaultMaxStreamsPerConnection = 100;
+const int kDefaultMinThreadsPerProcess = 2;
+const int kDefaultMaxThreadsPerProcess = 10;
+const int kDefaultMaxServerPushDepth = 1;
+const bool kDefaultSendVersionHeader = true;
+const bool kDefaultServerPushDiscoveryEnabled = false;
+const bool kDefaultServerPushDiscoverySendDebugHeaders = false;
+const mod_spdy::spdy::SpdyVersion kDefaultUseSpdyVersionWithoutSsl =
+    mod_spdy::spdy::SPDY_VERSION_NONE;
+const int kDefaultVlogLevel = 0;
+
+}  // namespace
+
+namespace mod_spdy {
+
+SpdyServerConfig::SpdyServerConfig()
+    : spdy_enabled_(kDefaultSpdyEnabled),
+      max_streams_per_connection_(kDefaultMaxStreamsPerConnection),
+      min_threads_per_process_(kDefaultMinThreadsPerProcess),
+      max_threads_per_process_(kDefaultMaxThreadsPerProcess),
+      max_server_push_depth_(kDefaultMaxServerPushDepth),
+      send_version_header_(kDefaultSendVersionHeader),
+      server_push_discovery_enabled_(kDefaultServerPushDiscoveryEnabled),
+      server_push_discovery_send_debug_headers_(
+          kDefaultServerPushDiscoverySendDebugHeaders),
+      use_spdy_version_without_ssl_(kDefaultUseSpdyVersionWithoutSsl),
+      vlog_level_(kDefaultVlogLevel) {}
+
+SpdyServerConfig::~SpdyServerConfig() {}
+
+void SpdyServerConfig::MergeFrom(const SpdyServerConfig& a,
+                                 const SpdyServerConfig& b) {
+  spdy_enabled_.MergeFrom(a.spdy_enabled_, b.spdy_enabled_);
+  max_streams_per_connection_.MergeFrom(a.max_streams_per_connection_,
+                                        b.max_streams_per_connection_);
+  min_threads_per_process_.MergeFrom(a.min_threads_per_process_,
+                                     b.min_threads_per_process_);
+  max_threads_per_process_.MergeFrom(a.max_threads_per_process_,
+                                     b.max_threads_per_process_);
+  max_server_push_depth_.MergeFrom(a.max_server_push_depth_,
+                                   b.max_server_push_depth_);
+  send_version_header_.MergeFrom(
+      a.send_version_header_, b.send_version_header_);
+  server_push_discovery_enabled_.MergeFrom(a.server_push_discovery_enabled_,
+                                           b.server_push_discovery_enabled_);
+  server_push_discovery_send_debug_headers_.MergeFrom(
+      a.server_push_discovery_send_debug_headers_,
+      b.server_push_discovery_send_debug_headers_);
+  use_spdy_version_without_ssl_.MergeFrom(
+      a.use_spdy_version_without_ssl_, b.use_spdy_version_without_ssl_);
+  vlog_level_.MergeFrom(a.vlog_level_, b.vlog_level_);
+}
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h Thu May  1 11:39:27 2014
@@ -0,0 +1,139 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_SERVER_CONFIG_H_
+#define MOD_SPDY_COMMON_SPDY_SERVER_CONFIG_H_
+
+#include "base/basictypes.h"
+#include "mod_spdy/common/protocol_util.h"
+
+namespace mod_spdy {
+
+// Stores server configuration settings for our module.
+class SpdyServerConfig {
+ public:
+  SpdyServerConfig();
+  ~SpdyServerConfig();
+
+  // Return true if SPDY is enabled for this server, false otherwise.
+  bool spdy_enabled() const { return spdy_enabled_.get(); }
+
+  // Return the maximum number of simultaneous SPDY streams that should be
+  // permitted for a single client connection.
+  int max_streams_per_connection() const {
+    return max_streams_per_connection_.get();
+  }
+
+  // Return the minimum number of worker threads to spawn per child process.
+  int min_threads_per_process() const {
+    return min_threads_per_process_.get();
+  }
+
+  // Return the maximum number of worker threads to spawn per child process.
+  int max_threads_per_process() const {
+    return max_threads_per_process_.get();
+  }
+
+  // Return the maximum number of recursive levels to follow
+  // X-Associated-Content headers
+  int max_server_push_depth() const {
+    return max_server_push_depth_.get();
+  }
+
+  // Whether or not we should include an x-mod-spdy header with the module
+  // version number.
+  bool send_version_header() const { return send_version_header_.get(); }
+
+  // Return if SPDY server push discovery is enabled.
+  bool server_push_discovery_enabled() const {
+    return server_push_discovery_enabled_.get();
+  }
+
+  // Return if we should send server push discovery debug headers to user agent.
+  bool server_push_discovery_send_debug_headers() const {
+    return server_push_discovery_send_debug_headers_.get();
+  }
+
+  // If nonzero, assume (unencrypted) SPDY/x for non-SSL connections, where x
+  // is the version number returned here.  This will most likely break normal
+  // browsers, but is useful for testing.
+  spdy::SpdyVersion use_spdy_version_without_ssl() const {
+    return use_spdy_version_without_ssl_.get();
+  }
+
+  // Return the maximum VLOG level we should use.
+  int vlog_level() const { return vlog_level_.get(); }
+
+  // Setters.  Call only during the configuration phase.
+  void set_spdy_enabled(bool b) { spdy_enabled_.set(b); }
+  void set_max_streams_per_connection(int n) {
+    max_streams_per_connection_.set(n);
+  }
+  void set_min_threads_per_process(int n) { min_threads_per_process_.set(n); }
+  void set_max_threads_per_process(int n) { max_threads_per_process_.set(n); }
+  void set_max_server_push_depth(int n) { max_server_push_depth_.set(n); }
+  void set_send_version_header(bool b) { send_version_header_.set(b); }
+  void set_server_push_discovery_enabled(bool b) {
+    return server_push_discovery_enabled_.set(b);
+  }
+  void set_server_push_discovery_send_debug_headers(bool b) {
+    return server_push_discovery_send_debug_headers_.set(b);
+  }
+  void set_use_spdy_version_without_ssl(spdy::SpdyVersion v) {
+    use_spdy_version_without_ssl_.set(v);
+  }
+  void set_vlog_level(int n) { vlog_level_.set(n); }
+
+  // Set this config object to the merge of a and b.  Call only during the
+  // configuration phase.
+  void MergeFrom(const SpdyServerConfig& a, const SpdyServerConfig& b);
+
+ private:
+  template <typename T>
+  class Option {
+   public:
+    explicit Option(const T& default_value)
+        : was_set_(false), value_(default_value) {}
+    const T& get() const { return value_; }
+    void set(const T& value) { was_set_ = true; value_ = value; }
+    void MergeFrom(const Option<T>& a, const Option<T>& b) {
+      was_set_ = a.was_set_ || b.was_set_;
+      value_ = a.was_set_ ? a.value_ : b.value_;
+    }
+   private:
+    bool was_set_;
+    T value_;
+    DISALLOW_COPY_AND_ASSIGN(Option);
+  };
+
+  // Configuration fields:
+  Option<bool> spdy_enabled_;
+  Option<int> max_streams_per_connection_;
+  Option<int> min_threads_per_process_;
+  Option<int> max_threads_per_process_;
+  Option<int> max_server_push_depth_;
+  Option<bool> send_version_header_;
+  Option<bool> server_push_discovery_enabled_;
+  Option<bool> server_push_discovery_send_debug_headers_;
+  Option<spdy::SpdyVersion> use_spdy_version_without_ssl_;
+  Option<int> vlog_level_;
+  // Note: Add more config options here as needed; be sure to also update the
+  //   MergeFrom method in spdy_server_config.cc.
+
+  DISALLOW_COPY_AND_ASSIGN(SpdyServerConfig);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_CONTEXT_SPDY_SERVER_CONFIG_H_

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,23 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_server_push_interface.h"
+
+namespace mod_spdy {
+
+SpdyServerPushInterface::SpdyServerPushInterface() {}
+
+SpdyServerPushInterface::~SpdyServerPushInterface() {}
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h Thu May  1 11:39:27 2014
@@ -0,0 +1,66 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_SERVER_PUSH_INTERFACE_H_
+#define MOD_SPDY_COMMON_SPDY_SERVER_PUSH_INTERFACE_H_
+
+#include "base/basictypes.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace mod_spdy {
+
+class SpdyServerPushInterface {
+ public:
+  SpdyServerPushInterface();
+  virtual ~SpdyServerPushInterface();
+
+  enum PushStatus {
+    // PUSH_STARTED: The server push was started successfully.
+    PUSH_STARTED,
+    // INVALID_REQUEST_HEADERS: The given request headers were invalid for a
+    // server push (e.g. because required headers were missing).
+    INVALID_REQUEST_HEADERS,
+    // ASSOCIATED_STREAM_INACTIVE: The push could not be started because the
+    // associated stream is not currently active.
+    ASSOCIATED_STREAM_INACTIVE,
+    // CANNOT_PUSH_EVER_AGAIN: We can't do any more pushes on this session,
+    // either because the client has already sent us a GOAWAY frame, or the
+    // session has been open so long that we've run out of stream IDs.
+    CANNOT_PUSH_EVER_AGAIN,
+    // TOO_MANY_CONCURRENT_PUSHES: The push could not be started right now
+    // because there are too many currently active push streams.
+    TOO_MANY_CONCURRENT_PUSHES,
+    // PUSH_INTERNAL_ERROR: There was an internal error in the SpdySession
+    // (typically something that caused a LOG(DFATAL).
+    PUSH_INTERNAL_ERROR,
+  };
+
+  // Initiate a SPDY server push, roughly by pretending that the client sent a
+  // SYN_STREAM with the given headers.  To repeat: the headers argument is
+  // _not_ the headers that the server will send to the client, but rather the
+  // headers to _pretend_ that the client sent to the server.
+  virtual PushStatus StartServerPush(
+      net::SpdyStreamId associated_stream_id,
+      int32 server_push_depth,
+      net::SpdyPriority priority,
+      const net::SpdyHeaderBlock& request_headers) = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(SpdyServerPushInterface);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_COMMON_SPDY_SERVER_PUSH_INTERFACE_H_

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,914 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_session.h"
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/lock.h"
+#include "base/time/time.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/spdy_server_config.h"
+#include "mod_spdy/common/spdy_session_io.h"
+#include "mod_spdy/common/spdy_stream.h"
+#include "mod_spdy/common/spdy_stream_task_factory.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace {
+
+// Server push stream IDs must be even, and must fit in 31 bits (SPDY draft 3
+// section 2.3.2).  Thus, this is the largest stream ID we can ever use for a
+// pushed stream.
+const net::SpdyStreamId kMaxServerPushStreamId = 0x7FFFFFFEu;
+
+// Until the client informs us otherwise, we will assume a limit of 100 open
+// push streams at a time.
+const uint32 kInitMaxConcurrentPushes = 100u;
+
+}  // namespace
+
+namespace mod_spdy {
+
+SpdySession::SpdySession(spdy::SpdyVersion spdy_version,
+                         const SpdyServerConfig* config,
+                         SpdySessionIO* session_io,
+                         SpdyStreamTaskFactory* task_factory,
+                         Executor* executor)
+    : spdy_version_(spdy_version),
+      config_(config),
+      session_io_(session_io),
+      task_factory_(task_factory),
+      executor_(executor),
+      framer_(SpdyVersionToFramerVersion(spdy_version), true),
+      session_stopped_(false),
+      already_sent_goaway_(false),
+      last_client_stream_id_(0u),
+      initial_window_size_(net::kSpdyStreamInitialWindowSize),
+      max_concurrent_pushes_(kInitMaxConcurrentPushes),
+      last_server_push_stream_id_(0u),
+      received_goaway_(false),
+      shared_window_(net::kSpdyStreamInitialWindowSize,
+                     net::kSpdyStreamInitialWindowSize) {
+  DCHECK_NE(spdy::SPDY_VERSION_NONE, spdy_version);
+  framer_.set_visitor(this);
+}
+
+SpdySession::~SpdySession() {}
+
+int32 SpdySession::current_shared_input_window_size() const {
+  DCHECK_GE(spdy_version_, spdy::SPDY_VERSION_3_1);
+  return shared_window_.current_input_window_size();
+}
+
+int32 SpdySession::current_shared_output_window_size() const {
+  DCHECK_GE(spdy_version_, spdy::SPDY_VERSION_3_1);
+  return shared_window_.current_output_window_size();
+}
+
+void SpdySession::Run() {
+  // Send a SETTINGS frame when the connection first opens, to inform the
+  // client of our MAX_CONCURRENT_STREAMS limit.
+  SendSettingsFrame();
+
+  // Initial amount time to block when waiting for output -- we start with
+  // this, and as long as we fail to perform any input OR output, we increase
+  // exponentially to the max, resetting when we succeed again.
+  const base::TimeDelta kInitOutputBlockTime =
+      base::TimeDelta::FromMilliseconds(1);
+  // Maximum time to block when waiting for output.
+  const base::TimeDelta kMaxOutputBlockTime =
+      base::TimeDelta::FromMilliseconds(30);
+
+  base::TimeDelta output_block_time = kInitOutputBlockTime;
+
+  // Until we stop the session, or it is aborted by the client, alternate
+  // between reading input from the client and (compressing and) sending output
+  // frames that our stream threads have posted to the output queue.  This
+  // basically amounts to a busy-loop, switching back and forth between input
+  // and output, so we do our best to block when we can.  It would be far nicer
+  // to have separate threads for input and output and have them always block;
+  // unfortunately, we cannot do that, because in Apache the input and output
+  // filter chains for a connection must be invoked by the same thread.
+  while (!session_stopped_) {
+    if (session_io_->IsConnectionAborted()) {
+      LOG(WARNING) << "Master connection was aborted.";
+      StopSession();
+      break;
+    }
+
+    // Step 1: Read input from the client.
+    {
+      // Determine whether we should block until more input data is available.
+      // For now, our policy is to block only if there is no pending output and
+      // there are no currently-active streams (which might produce new
+      // output).
+      const bool should_block = StreamMapIsEmpty() && output_queue_.IsEmpty();
+
+      // If there's no current output, and we can't create new streams (so
+      // there will be no future output), then we should just shut down the
+      // connection.
+      if (should_block && already_sent_goaway_) {
+        StopSession();
+        break;
+      }
+
+      // Read available input data.  The SpdySessionIO will grab any
+      // available data and push it into the SpdyFramer that we pass to it
+      // here; the SpdyFramer, in turn, will call our OnControl and/or
+      // OnStreamFrameData methods to report decoded frames.  If no input data
+      // is currently available and should_block is true, this will block until
+      // input becomes available (or the connection is closed).
+      const SpdySessionIO::ReadStatus status =
+          session_io_->ProcessAvailableInput(should_block, &framer_);
+      if (status == SpdySessionIO::READ_SUCCESS) {
+        // We successfully did some I/O, so reset the output block timeout.
+        output_block_time = kInitOutputBlockTime;
+      } else if (status == SpdySessionIO::READ_CONNECTION_CLOSED) {
+        // The reading side of the connection has closed, so we won't be
+        // reading anything more.  SPDY is transport-layer agnostic and not
+        // TCP-specific; apparently, this means that there is no expectation
+        // that we behave any differently for a half-closed connection than for
+        // a fully-closed connection.  So if the reading side of the connection
+        // closes, we're just going to shut down completely.
+        //
+        // But just in case the writing side is still open, let's try to send a
+        // GOAWAY to let the client know we're shutting down gracefully.
+        SendGoAwayFrame(net::GOAWAY_OK);
+        // Now, shut everything down.
+        StopSession();
+      } else if (status == SpdySessionIO::READ_ERROR) {
+        // There was an error during reading, so the session is corrupted and
+        // we have no chance of reading anything more.
+        //
+        // We've probably already sent a GOAWAY with a PROTOCOL_ERROR by this
+        // point, but if we haven't (perhaps the error was our fault?) then
+        // send a GOAWAY now.  (If we've already sent a GOAWAY, then
+        // SendGoAwayFrame is a no-op.)
+        SendGoAwayFrame(net::GOAWAY_INTERNAL_ERROR);
+        // Now, shut everything down.
+        StopSession();
+      } else {
+        // Otherwise, there's simply no data available at the moment.
+        DCHECK_EQ(SpdySessionIO::READ_NO_DATA, status);
+      }
+    }
+
+    // Step 2: Send output to the client.
+    if (!session_stopped_) {
+      // If there are no active streams, then no new output can be getting
+      // created right now, so we shouldn't block on output waiting for more.
+      const bool no_active_streams = StreamMapIsEmpty();
+
+      // Send any pending output, one frame at a time.  If there are any active
+      // streams, we're willing to block briefly to wait for more frames to
+      // send, if only to prevent this loop from busy-waiting too heavily --
+      // not a great solution, but better than nothing for now.
+      net::SpdyFrameIR* frame = NULL;
+      if (no_active_streams ? output_queue_.Pop(&frame) :
+          output_queue_.BlockingPop(output_block_time, &frame)) {
+        do {
+          SendFrame(frame);
+        } while (!session_stopped_ && output_queue_.Pop(&frame));
+
+        // We successfully did some I/O, so reset the output block timeout.
+        output_block_time = kInitOutputBlockTime;
+      } else {
+        // The queue is currently empty; if no more streams can be created and
+        // no more remain, we're done.
+        if (already_sent_goaway_ && no_active_streams) {
+          StopSession();
+        } else {
+          // There were no output frames within the timeout; so do an
+          // exponential backoff by doubling output_block_time.
+          output_block_time = std::min(kMaxOutputBlockTime,
+                                       output_block_time * 2);
+        }
+      }
+    }
+
+    // TODO(mdsteele): What we really want to be able to do is to block until
+    // *either* more input or more output is available.  Unfortunely, there's
+    // no good way to query the input side (in Apache).  One possibility would
+    // be to grab the input socket object (which we can do), and then arrange
+    // to block until either the socket is ready to read OR our output queue is
+    // nonempty (obviously we would abstract that away in SpdySessionIO),
+    // but there's not even a nice way to do that (that I know of).
+  }
+}
+
+SpdyServerPushInterface::PushStatus SpdySession::StartServerPush(
+    net::SpdyStreamId associated_stream_id,
+    int32 server_push_depth,
+    net::SpdyPriority priority,
+    const net::SpdyHeaderBlock& request_headers) {
+  // Server push is pretty ill-defined in SPDY v2, so we require v3 or higher.
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+
+  // Grab the headers that we are required to send with the initial SYN_STREAM.
+  const net::SpdyHeaderBlock::const_iterator host_iter =
+      request_headers.find(spdy::kSpdy3Host);
+  const net::SpdyHeaderBlock::const_iterator path_iter =
+      request_headers.find(spdy::kSpdy3Path);
+  const net::SpdyHeaderBlock::const_iterator scheme_iter =
+      request_headers.find(spdy::kSpdy3Scheme);
+  if (host_iter == request_headers.end() ||
+      path_iter == request_headers.end() ||
+      scheme_iter == request_headers.end()) {
+    return SpdyServerPushInterface::INVALID_REQUEST_HEADERS;
+  }
+  const std::string& host_header = host_iter->second;
+  const std::string& path_header = path_iter->second;
+  const std::string& scheme_header = scheme_iter->second;
+
+  StreamTaskWrapper* task_wrapper = NULL;
+  {
+    base::AutoLock autolock(stream_map_lock_);
+
+    // If we've received a GOAWAY frame the client, we shouldn't create any new
+    // streams on this session (SPDY draft 3 section 2.6.6).
+    if (received_goaway_) {
+      return SpdyServerPushInterface::CANNOT_PUSH_EVER_AGAIN;
+    }
+
+    // The associated stream must be active (SPDY draft 3 section 3.3.1).
+    if (!stream_map_.IsStreamActive(associated_stream_id)) {
+      return SpdyServerPushInterface::ASSOCIATED_STREAM_INACTIVE;
+    }
+
+    // Check if we're allowed to create new push streams right now (based on
+    // the client SETTINGS_MAX_CONCURRENT_STREAMS).  Note that the number of
+    // active push streams might be (temporarily) greater than the max, if the
+    // client lowered the max after we already started a bunch of pushes.
+    if (stream_map_.NumActivePushStreams() >= max_concurrent_pushes_) {
+      return SpdyServerPushInterface::TOO_MANY_CONCURRENT_PUSHES;
+    }
+
+    // In the unlikely event that the session stays open so long that we run
+    // out of server push stream IDs, we may not do any more pushes on this
+    // session (SPDY draft 3 section 2.3.2).
+    DCHECK_LE(last_server_push_stream_id_, kMaxServerPushStreamId);
+    if (last_server_push_stream_id_ >= kMaxServerPushStreamId) {
+      return SpdyServerPushInterface::CANNOT_PUSH_EVER_AGAIN;
+    }
+    // Server push stream IDs must be even (SPDY draft 3 section 2.3.2).  So
+    // each time we do a push, we increment last_server_push_stream_id_ by two.
+    DCHECK_EQ(last_server_push_stream_id_ % 2u, 0u);
+    last_server_push_stream_id_ += 2u;
+    const net::SpdyStreamId stream_id = last_server_push_stream_id_;
+    // Only the server can create even stream IDs, and we never use the same
+    // one twice, so our chosen stream_id should definitely not be in use.
+    if (stream_map_.IsStreamActive(stream_id)) {
+      LOG(DFATAL) << "Next server push stream ID already in use: "
+                  << stream_id;
+      return SpdyServerPushInterface::PUSH_INTERNAL_ERROR;
+    }
+
+    // Create task and add it to the stream map.
+    task_wrapper = new StreamTaskWrapper(
+        this, stream_id, associated_stream_id, server_push_depth, priority);
+    stream_map_.AddStreamTask(task_wrapper);
+    net::SpdySynStreamIR* frame = new net::SpdySynStreamIR(stream_id);
+    frame->set_associated_to_stream_id(associated_stream_id);
+    frame->set_priority(priority);
+    frame->set_fin(true);
+    frame->GetMutableNameValueBlock()->insert(
+        request_headers.begin(), request_headers.end());
+    task_wrapper->stream()->PostInputFrame(frame);
+
+    // Send initial SYN_STREAM to the client.  It only needs to contain the
+    // ":host", ":path", and ":scheme" headers; the rest can follow in a later
+    // HEADERS frame (SPDY draft 3 section 3.3.1).
+    net::SpdyHeaderBlock initial_response_headers;
+    initial_response_headers[spdy::kSpdy3Host] = host_header;
+    initial_response_headers[spdy::kSpdy3Path] = path_header;
+    initial_response_headers[spdy::kSpdy3Scheme] = scheme_header;
+    task_wrapper->stream()->SendOutputSynStream(
+        initial_response_headers, false);
+
+    VLOG(2) << "Starting server push; opening stream " << stream_id;
+  }
+  if (task_wrapper == NULL) {
+    LOG(DFATAL) << "Can't happen: task_wrapper is NULL";
+    return SpdyServerPushInterface::PUSH_INTERNAL_ERROR;
+  }
+  executor_->AddTask(task_wrapper, priority);
+  return SpdyServerPushInterface::PUSH_STARTED;
+}
+
+void SpdySession::OnError(net::SpdyFramer::SpdyError error_code) {
+  LOG(ERROR) << "Session error: "
+             << net::SpdyFramer::ErrorCodeToString(error_code);
+  SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+}
+
+void SpdySession::OnStreamError(net::SpdyStreamId stream_id,
+                                const std::string& description) {
+  LOG(ERROR) << "Stream " << stream_id << " error: " << description;
+  AbortStream(stream_id, net::RST_STREAM_PROTOCOL_ERROR);
+}
+
+void SpdySession::OnStreamFrameData(
+    net::SpdyStreamId stream_id, const char* data, size_t length, bool fin) {
+  // First check the shared input flow control window (for SPDY/3.1 and up).
+  if (spdy_version_ >= spdy::SPDY_VERSION_3_1) {
+    if (!shared_window_.OnReceiveInputData(length)) {
+      LOG(ERROR) << "Client violated flow control by sending too much data "
+                 << "to session.  Sending GOAWAY.";
+      SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+      StopSession();
+      return;
+    }
+  }
+
+  // Look up the stream to post the data to.  We need to lock when reading the
+  // stream map, because one of the stream threads could call
+  // RemoveStreamTask() at any time.
+  {
+    base::AutoLock autolock(stream_map_lock_);
+    SpdyStream* stream = stream_map_.GetStream(stream_id);
+    if (stream != NULL) {
+      VLOG(4) << "[stream " << stream_id << "] Received DATA (length="
+              << length << ")";
+      // Copy the data into an _uncompressed_ SPDY data frame and post it to
+      // the stream's input queue.
+      // Note that we must still be holding stream_map_lock_ when we call this
+      // method -- otherwise the stream may be deleted out from under us by the
+      // StreamTaskWrapper destructor.  That's okay -- PostInputFrame is a
+      // quick operation and won't block (for any appreciable length of time).
+      net::SpdyDataIR* frame =
+          new net::SpdyDataIR(stream_id, base::StringPiece(data, length));
+      frame->set_fin(fin);
+      stream->PostInputFrame(frame);
+      return;
+    }
+  }
+
+  // If we reach this point, it means that the client has sent us DATA for a
+  // stream that doesn't exist (possibly because it used to exist but has
+  // already been closed by a FLAG_FIN); *unless* length=0, which is just the
+  // BufferedSpdyFramer's way of telling us that there will be no more data on
+  // this stream (i.e. because a FLAG_FIN has been received, possibly on a
+  // previous control frame).
+
+  // TODO(mdsteele): The BufferedSpdyFramer sends us OnStreamFrameData with
+  // length=0 to indicate end-of-stream, but it will do this even if we already
+  // got FLAG_FIN in a control frame (such as SYN_STREAM).  For now, we fix
+  // this issue by simply ignoring length=0 data for streams that no longer
+  // exist.  Once we transition to the new plain SpdyFramer, we'll be able to
+  // handle this more precisely.
+  if (length == 0) {
+    return;
+  }
+
+  // If the client sends data for a nonexistant stream, we must send a
+  // RST_STREAM frame with error code INVALID_STREAM (SPDY draft 2 section
+  // 2.4).  Note that we release the mutex *before* sending the frame.
+  LOG(WARNING) << "Client sent DATA (length=" << length
+               << ") for nonexistant stream " << stream_id;
+  SendRstStreamFrame(stream_id, net::RST_STREAM_INVALID_STREAM);
+}
+
+void SpdySession::OnSynStream(
+    net::SpdyStreamId stream_id,
+    net::SpdyStreamId associated_stream_id,
+    net::SpdyPriority priority,
+    uint8 credential_slot,
+    bool fin,
+    bool unidirectional,
+    const net::SpdyHeaderBlock& headers) {
+  // The SPDY spec requires us to ignore SYN_STREAM frames after sending a
+  // GOAWAY frame (SPDY draft 3 section 2.6.6).
+  if (already_sent_goaway_) {
+    return;
+  }
+
+  // Client stream IDs must be odd-numbered.
+  if (stream_id % 2 == 0) {
+    LOG(WARNING) << "Client sent SYN_STREAM for even stream ID (" << stream_id
+                 << ").  Sending GOAWAY.";
+    SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+    return;
+  }
+
+  // Client stream IDs must be strictly increasing (SPDY draft 2 section
+  // 2.5.1).
+  if (stream_id <= last_client_stream_id_) {
+    LOG(WARNING) << "Client sent SYN_STREAM for non-increasing stream ID ("
+                 << stream_id << " after " << last_client_stream_id_
+                 << ").";  //  Aborting stream.";
+#if 0
+    // TODO(mdsteele): re-enable this code block when
+    // http://code.google.com/p/chromium/issues/detail?id=111708 is
+    // fixed.
+    AbortStream(stream_id, net::PROTOCOL_ERROR);
+    return;
+#endif
+  }
+
+  StreamTaskWrapper* task_wrapper = NULL;
+  {
+    // Lock the stream map before we start checking its size or adding a new
+    // stream to it.  We need to lock when touching the stream map, because one
+    // of the stream threads could call RemoveStreamTask() at any time.
+    base::AutoLock autolock(stream_map_lock_);
+
+#if 0
+    // TODO(mdsteele): re-enable this code block when
+    // http://code.google.com/p/chromium/issues/detail?id=111708 is
+    // fixed.
+
+    // We already checked that stream_id > last_client_stream_id_, so there
+    // definitely shouldn't already be a stream with this ID in the map.
+    DCHECK(!stream_map_.IsStreamActive(stream_id));
+#else
+    if (stream_map_.IsStreamActive(stream_id)) {
+      SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+      return;
+    }
+#endif
+
+    // Limit the number of simultaneous open streams the client can create;
+    // refuse the stream if there are too many currently active (non-push)
+    // streams.
+    if (static_cast<int>(stream_map_.NumActiveClientStreams()) >=
+        config_->max_streams_per_connection()) {
+      SendRstStreamFrame(stream_id, net::RST_STREAM_REFUSED_STREAM);
+      return;
+    }
+
+    // Initiate a new stream.
+    last_client_stream_id_ = std::max(last_client_stream_id_, stream_id);
+    task_wrapper = new StreamTaskWrapper(
+        this, stream_id, associated_stream_id,
+        0, // server_push_depth = 0
+        priority);
+    stream_map_.AddStreamTask(task_wrapper);
+    net::SpdySynStreamIR* frame = new net::SpdySynStreamIR(stream_id);
+    frame->set_associated_to_stream_id(associated_stream_id);
+    frame->set_priority(priority);
+    frame->set_slot(credential_slot);
+    frame->set_fin(fin);
+    frame->set_unidirectional(unidirectional);
+    frame->GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+    task_wrapper->stream()->PostInputFrame(frame);
+  }
+  DCHECK(task_wrapper);
+  // Release the lock before adding the task to the executor.  This is mostly
+  // for the benefit of unit tests, for which calling AddTask will execute the
+  // task immediately (and we don't want to be holding the lock when that
+  // happens).  Note that it's safe for us to pass task_wrapper here without
+  // holding the lock, because the task won't get deleted before it's been
+  // added to the executor.
+  VLOG(2) << "Received SYN_STREAM; opening stream " << stream_id;
+  executor_->AddTask(task_wrapper, priority);
+}
+
+void SpdySession::OnSynReply(net::SpdyStreamId stream_id,
+                             bool fin,
+                             const net::SpdyHeaderBlock& headers) {
+  // TODO(mdsteele)
+}
+
+void SpdySession::OnRstStream(net::SpdyStreamId stream_id,
+                              net::SpdyRstStreamStatus status) {
+  switch (status) {
+    // These are totally benign reasons to abort a stream, so just abort the
+    // stream without a fuss.
+    case net::RST_STREAM_REFUSED_STREAM:
+    case net::RST_STREAM_CANCEL:
+      VLOG(2) << "Client cancelled/refused stream " << stream_id;
+      AbortStreamSilently(stream_id);
+      break;
+    // If there was an error, abort the stream, but log a warning first.
+    // TODO(mdsteele): Should we have special behavior for different kinds of
+    //   errors?
+    default:
+      LOG(WARNING) << "Client sent RST_STREAM with "
+                   << RstStreamStatusCodeToString(status)
+                   << " for stream " << stream_id << ".  Aborting stream.";
+      AbortStreamSilently(stream_id);
+      break;
+  }
+}
+
+void SpdySession::OnSettings(bool clear_persisted) {
+  // Do nothing; we never persist values, so we don't need to pay attention to
+  // this flag.
+}
+
+void SpdySession::OnSetting(net::SpdySettingsIds id,
+                            uint8 flags, uint32 value) {
+  VLOG(4) << "Received SETTING (flags=" << flags << "): "
+          << SettingsIdToString(id) << "=" << value;
+  switch (id) {
+    case net::SETTINGS_MAX_CONCURRENT_STREAMS:
+      max_concurrent_pushes_ = value;
+      break;
+    case net::SETTINGS_INITIAL_WINDOW_SIZE:
+      // Flow control only exists for SPDY v3 and up.
+      if (spdy_version() < spdy::SPDY_VERSION_3) {
+        LOG(ERROR) << "Client sent INITIAL_WINDOW_SIZE setting over "
+                   << "SPDY/" << SpdyVersionNumberString(spdy_version())
+                   << ".  Sending GOAWAY.";
+        SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+      } else {
+        SetInitialWindowSize(value);
+      }
+      break;
+    case net::SETTINGS_UPLOAD_BANDWIDTH:
+    case net::SETTINGS_DOWNLOAD_BANDWIDTH:
+    case net::SETTINGS_ROUND_TRIP_TIME:
+    case net::SETTINGS_CURRENT_CWND:
+    case net::SETTINGS_DOWNLOAD_RETRANS_RATE:
+      // Ignore other settings for now.
+      break;
+    default:
+      LOG(ERROR) << "Client sent invalid SETTINGS id (" << id
+                 << ").  Sending GOAWAY.";
+      SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+      break;
+  }
+}
+
+void SpdySession::OnPing(uint32 unique_id) {
+  VLOG(4) << "Received PING frame (id=" << unique_id << ")";
+  // The SPDY spec requires the server to ignore even-numbered PING frames that
+  // it did not initiate (SPDY draft 3 section 2.6.5), and right now, we never
+  // initiate pings.
+  if (unique_id % 2 == 0) {
+    return;
+  }
+
+  // Any odd-numbered PING frame we receive was initiated by the client, and
+  // should be echoed back _immediately_ (SPDY draft 2 section 2.7.6).
+  SendFrame(new net::SpdyPingIR(unique_id));
+}
+
+void SpdySession::OnGoAway(net::SpdyStreamId last_accepted_stream_id,
+                           net::SpdyGoAwayStatus status) {
+  VLOG(4) << "Received GOAWAY frame (status="
+          << GoAwayStatusCodeToString(status) << ", last_accepted_stream_id="
+          << last_accepted_stream_id << ")";
+
+  // Take note that we have received a GOAWAY frame; we should not start any
+  // new server push streams on this session.
+  {
+    base::AutoLock autolock(stream_map_lock_);
+    received_goaway_ = true;
+  }
+
+  // If this was not a normal shutdown (GOAWAY_OK), we should probably log a
+  // warning to let the user know something's up.
+  switch (status) {
+    case net::GOAWAY_OK:
+      break;
+    case net::GOAWAY_PROTOCOL_ERROR:
+      LOG(WARNING) << "Client sent GOAWAY with PROTOCOL_ERROR.  Possibly we "
+                   << "did something wrong?";
+      break;
+    case net::GOAWAY_INTERNAL_ERROR:
+      LOG(WARNING) << "Client sent GOAWAY with INTERNAL_ERROR.  Apparently "
+                   << "they're broken?";
+      break;
+    default:
+      LOG(ERROR) << "Client sent GOAWAY with invalid status code ("
+                 << status << ").  Sending GOAWAY.";
+      SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+      break;
+  }
+}
+
+void SpdySession::OnHeaders(net::SpdyStreamId stream_id,
+                            bool fin,
+                            const net::SpdyHeaderBlock& headers) {
+  // Look up the stream to post the data to.  We need to lock when reading the
+  // stream map, because one of the stream threads could call
+  // RemoveStreamTask() at any time.
+  {
+    // TODO(mdsteele): This is pretty similar to the code in OnStreamFrameData.
+    //   Maybe we can factor it out?
+    base::AutoLock autolock(stream_map_lock_);
+    SpdyStream* stream = stream_map_.GetStream(stream_id);
+    if (stream != NULL) {
+      VLOG(4) << "[stream " << stream_id << "] Received HEADERS frame";
+      net::SpdySynStreamIR* frame = new net::SpdySynStreamIR(stream_id);
+      frame->set_fin(true);
+      frame->GetMutableNameValueBlock()->insert(
+          headers.begin(), headers.end());
+      stream->PostInputFrame(frame);
+      return;
+    }
+  }
+
+  // Note that we release the mutex *before* sending the frame.
+  LOG(WARNING) << "Client sent HEADERS for nonexistant stream " << stream_id;
+  SendRstStreamFrame(stream_id, net::RST_STREAM_INVALID_STREAM);
+}
+
+void SpdySession::OnPushPromise(net::SpdyStreamId stream_id,
+                                net::SpdyStreamId promised_stream_id) {
+  LOG(ERROR) << "Got a PUSH_PROMISE(" << stream_id << ", "
+             << promised_stream_id << ") frame from the client.";
+  SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+}
+
+void SpdySession::OnWindowUpdate(net::SpdyStreamId stream_id,
+                                 uint32 delta_window_size) {
+  // Flow control only exists for SPDY/3 and up.
+  if (spdy_version() < spdy::SPDY_VERSION_3) {
+    LOG(ERROR) << "Got a WINDOW_UPDATE frame over SPDY/"
+               << SpdyVersionNumberString(spdy_version());
+    SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+    return;
+  }
+
+  // Stream zero is special; starting in SPDY/3.1, it represents the
+  // session-wide flow control window.  For previous versions, it is invalid.
+  if (stream_id == 0) {
+    if (spdy_version() >= spdy::SPDY_VERSION_3_1) {
+      if (!shared_window_.IncreaseOutputWindowSize(delta_window_size)) {
+        LOG(ERROR) << "Got a WINDOW_UPDATE frame that overflows session "
+                   << "window.  Sending GOAWAY.";
+        SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+        StopSession();
+      }
+    } else {
+      LOG(ERROR) << "Got a WINDOW_UPDATE frame for stream 0 over SPDY/"
+                 << SpdyVersionNumberString(spdy_version());
+      SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+      StopSession();
+    }
+    return;
+  }
+
+  base::AutoLock autolock(stream_map_lock_);
+  SpdyStream* stream = stream_map_.GetStream(stream_id);
+  if (stream == NULL) {
+    // We must ignore WINDOW_UPDATE frames for closed streams (SPDY draft 3
+    // section 2.6.8).
+    return;
+  }
+
+  VLOG(4) << "[stream " << stream_id << "] Received WINDOW_UPDATE("
+          << delta_window_size << ") frame";
+  stream->AdjustOutputWindowSize(delta_window_size);
+}
+
+void SpdySession::SetInitialWindowSize(uint32 new_init_window_size) {
+  // Flow control only exists for SPDY v3 and up.  We shouldn't be calling this
+  // method for SPDY v2.
+  if (spdy_version() < spdy::SPDY_VERSION_3) {
+    LOG(DFATAL) << "SetInitialWindowSize called for SPDY/"
+                << SpdyVersionNumberString(spdy_version());
+    return;
+  }
+
+  // Validate the new window size; it must be positive, but at most int32max.
+  if (new_init_window_size == 0 ||
+      new_init_window_size >
+      static_cast<uint32>(net::kSpdyMaximumWindowSize)) {
+    LOG(WARNING) << "Client sent invalid init window size ("
+                 << new_init_window_size << ").  Sending GOAWAY.";
+    SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+    return;
+  }
+  // Sanity check that our current init window size is positive.  It's a signed
+  // int32, so we know it's no more than int32max.
+  DCHECK_GT(initial_window_size_, 0);
+  // We can now be sure that this subtraction won't overflow/underflow.
+  const int32 delta =
+      static_cast<int32>(new_init_window_size) - initial_window_size_;
+
+  // Set the initial window size for new streams.
+  initial_window_size_ = new_init_window_size;
+  // We also have to adjust the window size of all currently active streams by
+  // the delta (SPDY draft 3 section 2.6.8).
+  base::AutoLock autolock(stream_map_lock_);
+  stream_map_.AdjustAllOutputWindowSizes(delta);
+}
+
+// Compress (if necessary), send, and then delete the given frame object.
+void SpdySession::SendFrame(const net::SpdyFrameIR* frame_ptr) {
+  scoped_ptr<const net::SpdyFrameIR> frame(frame_ptr);
+  scoped_ptr<const net::SpdySerializedFrame> serialized_frame(
+      framer_.SerializeFrame(*frame));
+  if (serialized_frame == NULL) {
+    LOG(DFATAL) << "frame compression failed";
+    StopSession();
+    return;
+  }
+  SendFrameRaw(*serialized_frame);
+}
+
+void SpdySession::SendFrameRaw(const net::SpdySerializedFrame& frame) {
+  const SpdySessionIO::WriteStatus status = session_io_->SendFrameRaw(frame);
+  if (status == SpdySessionIO::WRITE_CONNECTION_CLOSED) {
+    // If the connection was closed and we can't write anything to the client
+    // anymore, then there's little point in continuing with the session.
+    StopSession();
+  } else {
+    DCHECK_EQ(SpdySessionIO::WRITE_SUCCESS, status);
+  }
+}
+
+void SpdySession::SendGoAwayFrame(net::SpdyGoAwayStatus status) {
+  if (!already_sent_goaway_) {
+    already_sent_goaway_ = true;
+    SendFrame(new net::SpdyGoAwayIR(last_client_stream_id_, status));
+  }
+}
+
+void SpdySession::SendRstStreamFrame(net::SpdyStreamId stream_id,
+                                     net::SpdyRstStreamStatus status) {
+  output_queue_.Insert(SpdyFramePriorityQueue::kTopPriority,
+                       new net::SpdyRstStreamIR(stream_id, status));
+}
+
+void SpdySession::SendSettingsFrame() {
+  scoped_ptr<net::SpdySettingsIR> settings(new net::SpdySettingsIR);
+  settings->AddSetting(net::SETTINGS_MAX_CONCURRENT_STREAMS,
+                       false, false, config_->max_streams_per_connection());
+  SendFrame(settings.release());
+}
+
+void SpdySession::StopSession() {
+  session_stopped_ = true;
+  // Abort all remaining streams.  We need to lock when reading the stream
+  // map, because one of the stream threads could call RemoveStreamTask() at
+  // any time.
+  {
+    base::AutoLock autolock(stream_map_lock_);
+    stream_map_.AbortAllSilently();
+  }
+  shared_window_.Abort();
+  // Stop all stream threads and tasks for this SPDY session.  This will
+  // block until all currently running stream tasks have exited, but since we
+  // just aborted all streams, that should hopefully happen fairly soon.  Note
+  // that we must release the lock before calling this, because each stream
+  // will remove itself from the stream map as it shuts down.
+  executor_->Stop();
+}
+
+// Abort the stream without sending anything to the client.
+void SpdySession::AbortStreamSilently(net::SpdyStreamId stream_id) {
+  // We need to lock when reading the stream map, because one of the stream
+  // threads could call RemoveStreamTask() at any time.
+  base::AutoLock autolock(stream_map_lock_);
+  SpdyStream* stream = stream_map_.GetStream(stream_id);
+  if (stream != NULL) {
+    stream->AbortSilently();
+  }
+}
+
+// Send a RST_STREAM frame and then abort the stream.
+void SpdySession::AbortStream(net::SpdyStreamId stream_id,
+                              net::SpdyRstStreamStatus status) {
+  SendRstStreamFrame(stream_id, status);
+  AbortStreamSilently(stream_id);
+}
+
+// Remove the StreamTaskWrapper from the stream map.  This is the only method
+// of SpdySession that is ever called by another thread (specifically, it is
+// called by the StreamTaskWrapper destructor, which is called by the executor,
+// which presumably uses worker threads) -- it is because of this that we must
+// lock the stream_map_lock_ whenever we touch the stream map or its contents.
+void SpdySession::RemoveStreamTask(StreamTaskWrapper* task_wrapper) {
+  // We need to lock when touching the stream map, in case the main connection
+  // thread is currently in the middle of reading the stream map.
+  base::AutoLock autolock(stream_map_lock_);
+  VLOG(2) << "Closing stream " << task_wrapper->stream()->stream_id();
+  stream_map_.RemoveStreamTask(task_wrapper);
+}
+
+bool SpdySession::StreamMapIsEmpty() {
+  base::AutoLock autolock(stream_map_lock_);
+  return stream_map_.IsEmpty();
+}
+
+// This constructor is always called by the main connection thread, so we're
+// safe to call spdy_session_->task_factory_->NewStreamTask().  However,
+// the other methods of this class (Run(), Cancel(), and the destructor) are
+// liable to be called from other threads by the executor.
+SpdySession::StreamTaskWrapper::StreamTaskWrapper(
+    SpdySession* spdy_session,
+    net::SpdyStreamId stream_id,
+    net::SpdyStreamId associated_stream_id,
+    int32 server_push_depth,
+    net::SpdyPriority priority)
+    : spdy_session_(spdy_session),
+      stream_(spdy_session->spdy_version(), stream_id, associated_stream_id,
+              server_push_depth, priority, spdy_session_->initial_window_size_,
+              &spdy_session_->output_queue_, &spdy_session_->shared_window_,
+              spdy_session_),
+      subtask_(spdy_session_->task_factory_->NewStreamTask(&stream_)) {
+  CHECK(subtask_);
+}
+
+SpdySession::StreamTaskWrapper::~StreamTaskWrapper() {
+  // Remove this object from the SpdySession's stream map.
+  spdy_session_->RemoveStreamTask(this);
+}
+
+void SpdySession::StreamTaskWrapper::Run() {
+  subtask_->CallRun();
+}
+
+void SpdySession::StreamTaskWrapper::Cancel() {
+  subtask_->CallCancel();
+}
+
+SpdySession::SpdyStreamMap::SpdyStreamMap()
+    : num_active_push_streams_(0u) {}
+
+SpdySession::SpdyStreamMap::~SpdyStreamMap() {}
+
+bool SpdySession::SpdyStreamMap::IsEmpty() {
+  DCHECK_LE(num_active_push_streams_, tasks_.size());
+  return tasks_.empty();
+}
+
+size_t SpdySession::SpdyStreamMap::NumActiveClientStreams() {
+  DCHECK_LE(num_active_push_streams_, tasks_.size());
+  return tasks_.size() - num_active_push_streams_;
+}
+
+size_t SpdySession::SpdyStreamMap::NumActivePushStreams() {
+  DCHECK_LE(num_active_push_streams_, tasks_.size());
+  return num_active_push_streams_;
+}
+
+bool SpdySession::SpdyStreamMap::IsStreamActive(net::SpdyStreamId stream_id) {
+  return tasks_.count(stream_id) > 0u;
+}
+
+void SpdySession::SpdyStreamMap::AddStreamTask(
+    StreamTaskWrapper* task_wrapper) {
+  DCHECK(task_wrapper);
+  SpdyStream* stream = task_wrapper->stream();
+  DCHECK(stream);
+  net::SpdyStreamId stream_id = stream->stream_id();
+  DCHECK_EQ(0u, tasks_.count(stream_id));
+  tasks_[stream_id] = task_wrapper;
+  if (stream->is_server_push()) {
+    ++num_active_push_streams_;
+  }
+  DCHECK_LE(num_active_push_streams_, tasks_.size());
+}
+
+void SpdySession::SpdyStreamMap::RemoveStreamTask(
+    StreamTaskWrapper* task_wrapper) {
+  DCHECK(task_wrapper);
+  SpdyStream* stream = task_wrapper->stream();
+  DCHECK(stream);
+  net::SpdyStreamId stream_id = stream->stream_id();
+  DCHECK_EQ(1u, tasks_.count(stream_id));
+  DCHECK_EQ(task_wrapper, tasks_[stream_id]);
+  if (stream->is_server_push()) {
+    DCHECK_GT(num_active_push_streams_, 0u);
+    --num_active_push_streams_;
+  }
+  tasks_.erase(stream_id);
+  DCHECK_LE(num_active_push_streams_, tasks_.size());
+}
+
+SpdyStream* SpdySession::SpdyStreamMap::GetStream(
+    net::SpdyStreamId stream_id) {
+  TaskMap::const_iterator iter = tasks_.find(stream_id);
+  if (iter == tasks_.end()) {
+    return NULL;
+  }
+  StreamTaskWrapper* task_wrapper = iter->second;
+  DCHECK(task_wrapper);
+  SpdyStream* stream = task_wrapper->stream();
+  DCHECK(stream);
+  DCHECK_EQ(stream_id, stream->stream_id());
+  return stream;
+}
+
+void SpdySession::SpdyStreamMap::AdjustAllOutputWindowSizes(int32 delta) {
+  for (TaskMap::const_iterator iter = tasks_.begin();
+       iter != tasks_.end(); ++iter) {
+    iter->second->stream()->AdjustOutputWindowSize(delta);
+  }
+}
+
+void SpdySession::SpdyStreamMap::AbortAllSilently() {
+  for (TaskMap::const_iterator iter = tasks_.begin();
+       iter != tasks_.end(); ++iter) {
+    iter->second->stream()->AbortSilently();
+  }
+}
+
+}  // namespace mod_spdy