You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ji...@apache.org on 2014/05/01 13:39:32 UTC
svn commit: r1591620 [7/14] - in /httpd/mod_spdy/branches/httpd-2.2.x: ./
base/ base/metrics/ build/ install/ install/common/ install/debian/
install/rpm/ mod_spdy/ mod_spdy/apache/ mod_spdy/apache/filters/
mod_spdy/apache/testing/ mod_spdy/common/ mod...
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,120 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+
+#include <list>
+#include <map>
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "base/time/time.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace mod_spdy {
+
+SpdyFramePriorityQueue::SpdyFramePriorityQueue()
+ : condvar_(&lock_) {}
+
+SpdyFramePriorityQueue::~SpdyFramePriorityQueue() {
+ for (QueueMap::iterator iter = queue_map_.begin();
+ iter != queue_map_.end(); ++iter) {
+ FrameList* list = iter->second;
+ STLDeleteContainerPointers(list->begin(), list->end());
+ delete list;
+ }
+}
+
+bool SpdyFramePriorityQueue::IsEmpty() const {
+ base::AutoLock autolock(lock_);
+ return queue_map_.empty();
+}
+
+const int SpdyFramePriorityQueue::kTopPriority = -1;
+
+void SpdyFramePriorityQueue::Insert(int priority, net::SpdyFrameIR* frame) {
+ base::AutoLock autolock(lock_);
+ DCHECK(frame);
+
+ // Get the frame list for the given priority; if it doesn't currently exist,
+ // create it in the map.
+ FrameList* list = NULL;
+ QueueMap::iterator iter = queue_map_.find(priority);
+ if (iter == queue_map_.end()) {
+ list = new FrameList;
+ queue_map_[priority] = list;
+ } else {
+ list = iter->second;
+ }
+ DCHECK(list);
+
+ // Add the frame to the end of the list, and wake up at most one thread
+ // sleeping on a BlockingPop.
+ list->push_back(frame);
+ condvar_.Signal();
+}
+
+bool SpdyFramePriorityQueue::Pop(net::SpdyFrameIR** frame) {
+ base::AutoLock autolock(lock_);
+ return InternalPop(frame);
+}
+
+bool SpdyFramePriorityQueue::BlockingPop(const base::TimeDelta& max_time,
+ net::SpdyFrameIR** frame) {
+ base::AutoLock autolock(lock_);
+ DCHECK(frame);
+
+ const base::TimeDelta zero = base::TimeDelta();
+ base::TimeDelta time_remaining = max_time;
+ while (time_remaining > zero && queue_map_.empty()) {
+ // TODO(mdsteele): It appears from looking at the Chromium source code that
+ // HighResNow() is "expensive" on Windows (how expensive, I am not sure);
+ // however, the other options for getting a "now" time either don't
+ // guarantee monotonicity (so time might go backwards) or might be too
+ // low-resolution for our purposes, so I think we'd better stick with this
+ // for now. But is there a better way to do what we're doing here?
+ const base::TimeTicks start = base::TimeTicks::HighResNow();
+ condvar_.TimedWait(time_remaining);
+ time_remaining -= base::TimeTicks::HighResNow() - start;
+ }
+
+ return InternalPop(frame);
+}
+
+bool SpdyFramePriorityQueue::InternalPop(net::SpdyFrameIR** frame) {
+ lock_.AssertAcquired();
+ DCHECK(frame);
+ if (queue_map_.empty()) {
+ return false;
+ }
+ // As an invariant, the lists in the queue map are never empty. So get the
+ // list of highest priority (smallest priority number) and pop the first
+ // frame from it.
+ QueueMap::iterator iter = queue_map_.begin();
+ FrameList* list = iter->second;
+ DCHECK(!list->empty());
+ *frame = list->front();
+ list->pop_front();
+ // If the list is now empty, we have to delete it from the map to maintain
+ // the invariant.
+ if (list->empty()) {
+ queue_map_.erase(iter);
+ delete list;
+ }
+ return true;
+}
+
+} // namespace mod_spdy
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h Thu May 1 11:39:27 2014
@@ -0,0 +1,95 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_FRAME_PRIORITY_QUEUE_H_
+#define MOD_SPDY_COMMON_SPDY_FRAME_PRIORITY_QUEUE_H_
+
+#include <list>
+#include <map>
+
+#include "base/basictypes.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+
+namespace base { class TimeDelta; }
+
+namespace net { class SpdyFrameIR; }
+
+namespace mod_spdy {
+
+// A priority queue of SPDY frames, intended for multiplexing output frames
+// from multiple SPDY stream threads back to the SPDY connection thread and
+// allowing frames from high-priority streams to cut in front of lower-priority
+// streams. This class is thread-safe -- its methods may be called
+// concurrently by multiple threads.
+class SpdyFramePriorityQueue {
+ public:
+ // Create an initially-empty queue.
+ SpdyFramePriorityQueue();
+ ~SpdyFramePriorityQueue();
+
+ // Return true if the queue is currently empty. (Of course, there's no
+ // guarantee that another thread won't change that as soon as this method
+ // returns.)
+ bool IsEmpty() const;
+
+ // A priority value that is more important than any priority normally used
+ // for sending SPDY frames.
+ static const int kTopPriority;
+
+ // Insert a frame into the queue at the specified priority. The queue takes
+ // ownership of the frame, and will delete it if the queue is deleted before
+ // the frame is removed from the queue by the Pop method. Note that smaller
+ // numbers indicate higher priorities.
+ void Insert(int priority, net::SpdyFrameIR* frame);
+
+ // Remove and provide a frame from the queue and return true, or return false
+ // if the queue is empty. The caller gains ownership of the provided frame
+ // object. This method will try to yield higher-priority frames before
+ // lower-priority ones (even if they were inserted later), but guarantees to
+ // return same-priority frames in the same order they were inserted (FIFO).
+ // In particular, this means that a sequence of frames from the same SPDY
+ // stream will stay in order (assuming they were all inserted with the same
+ // priority -- that of the stream).
+ bool Pop(net::SpdyFrameIR** frame);
+
+ // Like Pop(), but if the queue is empty this method will block for up to
+ // max_time before returning false.
+ bool BlockingPop(const base::TimeDelta& max_time, net::SpdyFrameIR** frame);
+
+ private:
+ // Same as Pop(), but requires lock_ to be held.
+ bool InternalPop(net::SpdyFrameIR** frame);
+
+ mutable base::Lock lock_;
+ base::ConditionVariable condvar_;
+ // We use a map of lists to store frames, to guarantee that frames of the
+ // same priority are stored in FIFO order. A simpler implementation would be
+ // to just use a multimap, which in practice is nearly always implemented
+ // with the FIFO behavior that we want, but the spec doesn't actually
+ // guarantee that behavior.
+ //
+ // Each list stores frames of a particular priority. Invariant: the lists in
+ // the QueueMap are never empty; if one of the lists becomes empty, that
+ // key/value pair is immediately removed from the map.
+ typedef std::list<net::SpdyFrameIR*> FrameList;
+ typedef std::map<int, FrameList*> QueueMap;
+ QueueMap queue_map_;
+
+ DISALLOW_COPY_AND_ASSIGN(SpdyFramePriorityQueue);
+};
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_COMMON_SPDY_FRAME_PRIORITY_QUEUE_H_
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue_test.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue_test.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_priority_queue_test.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,139 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+
+#include "base/time/time.h"
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace {
+
+void ExpectPop(net::SpdyPingId expected,
+ mod_spdy::SpdyFramePriorityQueue* queue) {
+ EXPECT_FALSE(queue->IsEmpty());
+ net::SpdyFrameIR* raw_frame = NULL;
+ const bool success = queue->Pop(&raw_frame);
+ scoped_ptr<net::SpdyFrameIR> scoped_frame(raw_frame);
+ EXPECT_TRUE(success);
+ ASSERT_TRUE(scoped_frame != NULL);
+ EXPECT_THAT(*scoped_frame, mod_spdy::testing::IsPing(expected));
+}
+
+void ExpectEmpty(mod_spdy::SpdyFramePriorityQueue* queue) {
+ EXPECT_TRUE(queue->IsEmpty());
+ net::SpdyFrameIR* frame = NULL;
+ EXPECT_FALSE(queue->Pop(&frame));
+ EXPECT_TRUE(frame == NULL);
+}
+
+TEST(SpdyFramePriorityQueueTest, InsertSpdy2) {
+ net::SpdyFramer framer(net::SPDY2);
+ mod_spdy::SpdyFramePriorityQueue queue;
+ ExpectEmpty(&queue);
+
+ EXPECT_EQ(3u, framer.GetLowestPriority());
+ EXPECT_EQ(0u, framer.GetHighestPriority());
+
+ queue.Insert(3, new net::SpdyPingIR(4));
+ queue.Insert(0, new net::SpdyPingIR(1));
+ queue.Insert(3, new net::SpdyPingIR(3));
+
+ ExpectPop(1, &queue);
+ ExpectPop(4, &queue);
+
+ queue.Insert(2, new net::SpdyPingIR(2));
+ queue.Insert(1, new net::SpdyPingIR(6));
+ queue.Insert(1, new net::SpdyPingIR(5));
+
+ ExpectPop(6, &queue);
+ ExpectPop(5, &queue);
+ ExpectPop(2, &queue);
+ ExpectPop(3, &queue);
+ ExpectEmpty(&queue);
+}
+
+TEST(SpdyFramePriorityQueueTest, InsertSpdy3) {
+ net::SpdyFramer framer(net::SPDY3);
+ mod_spdy::SpdyFramePriorityQueue queue;
+ ExpectEmpty(&queue);
+
+ EXPECT_EQ(7u, framer.GetLowestPriority());
+ EXPECT_EQ(0u, framer.GetHighestPriority());
+
+ queue.Insert(7, new net::SpdyPingIR(4));
+ queue.Insert(0, new net::SpdyPingIR(1));
+ queue.Insert(7, new net::SpdyPingIR(3));
+
+ ExpectPop(1, &queue);
+ ExpectPop(4, &queue);
+
+ queue.Insert(6, new net::SpdyPingIR(2));
+ queue.Insert(1, new net::SpdyPingIR(6));
+ queue.Insert(5, new net::SpdyPingIR(5));
+
+ ExpectPop(6, &queue);
+ ExpectPop(5, &queue);
+ ExpectPop(2, &queue);
+ ExpectPop(3, &queue);
+ ExpectEmpty(&queue);
+}
+
+TEST(SpdyFramePriorityQueueTest, InsertTopPriority) {
+ mod_spdy::SpdyFramePriorityQueue queue;
+ ExpectEmpty(&queue);
+
+ queue.Insert(3, new net::SpdyPingIR(4));
+ queue.Insert(mod_spdy::SpdyFramePriorityQueue::kTopPriority,
+ new net::SpdyPingIR(2));
+ queue.Insert(mod_spdy::SpdyFramePriorityQueue::kTopPriority,
+ new net::SpdyPingIR(6));
+ queue.Insert(0, new net::SpdyPingIR(1));
+ queue.Insert(3, new net::SpdyPingIR(3));
+
+ ExpectPop(2, &queue);
+ ExpectPop(6, &queue);
+ ExpectPop(1, &queue);
+ ExpectPop(4, &queue);
+
+ queue.Insert(mod_spdy::SpdyFramePriorityQueue::kTopPriority,
+ new net::SpdyPingIR(5));
+
+ ExpectPop(5, &queue);
+ ExpectPop(3, &queue);
+ ExpectEmpty(&queue);
+}
+
+TEST(SpdyFramePriorityQueueTest, BlockingPop) {
+ mod_spdy::SpdyFramePriorityQueue queue;
+ net::SpdyFrameIR* frame;
+ ASSERT_FALSE(queue.Pop(&frame));
+
+ const base::TimeDelta time_to_wait = base::TimeDelta::FromMilliseconds(50);
+ const base::TimeTicks start = base::TimeTicks::HighResNow();
+ ASSERT_FALSE(queue.BlockingPop(time_to_wait, &frame));
+ const base::TimeDelta actual_time_waited =
+ base::TimeTicks::HighResNow() - start;
+
+ // Check that we waited at least as long as we asked for.
+ EXPECT_GE(actual_time_waited, time_to_wait);
+ // Check that we didn't wait too much longer than we asked for.
+ EXPECT_LT(actual_time_waited.InMillisecondsF(),
+ 1.1 * time_to_wait.InMillisecondsF());
+}
+
+} // namespace
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,84 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_frame_queue.h"
+
+#include <list>
+
+#include "base/logging.h"
+#include "base/stl_util.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace mod_spdy {
+
+SpdyFrameQueue::SpdyFrameQueue()
+ : condvar_(&lock_), is_aborted_(false) {}
+
+SpdyFrameQueue::~SpdyFrameQueue() {
+ STLDeleteContainerPointers(queue_.begin(), queue_.end());
+}
+
+bool SpdyFrameQueue::is_aborted() const {
+ base::AutoLock autolock(lock_);
+ return is_aborted_;
+}
+
+void SpdyFrameQueue::Abort() {
+ base::AutoLock autolock(lock_);
+ is_aborted_ = true;
+ STLDeleteContainerPointers(queue_.begin(), queue_.end());
+ queue_.clear();
+ condvar_.Broadcast();
+}
+
+void SpdyFrameQueue::Insert(net::SpdyFrameIR* frame) {
+ base::AutoLock autolock(lock_);
+ DCHECK(frame);
+
+ if (is_aborted_) {
+ DCHECK(queue_.empty());
+ delete frame;
+ } else {
+ if (queue_.empty()) {
+ condvar_.Signal();
+ }
+ queue_.push_front(frame);
+ }
+}
+
+bool SpdyFrameQueue::Pop(bool block, net::SpdyFrameIR** frame) {
+ base::AutoLock autolock(lock_);
+ DCHECK(frame);
+
+ if (block) {
+ // Block until the queue is nonempty or we abort.
+ while (queue_.empty() && !is_aborted_) {
+ condvar_.Wait();
+ }
+ }
+
+ // If we've aborted, the queue should now be empty.
+ DCHECK(!is_aborted_ || queue_.empty());
+ if (queue_.empty()) {
+ return false;
+ }
+
+ *frame = queue_.back();
+ queue_.pop_back();
+ return true;
+}
+
+} // namespace mod_spdy
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h Thu May 1 11:39:27 2014
@@ -0,0 +1,71 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_FRAME_QUEUE_H_
+#define MOD_SPDY_COMMON_SPDY_FRAME_QUEUE_H_
+
+#include <list>
+
+#include "base/basictypes.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+
+namespace net { class SpdyFrameIR; }
+
+namespace mod_spdy {
+
+// A simple FIFO queue of SPDY frames, intended for sending input frames from
+// the SPDY connection thread to a SPDY stream thread. This class is
+// thread-safe -- all methods may be called concurrently by multiple threads.
+class SpdyFrameQueue {
+ public:
+ // Create an initially-empty queue.
+ SpdyFrameQueue();
+ ~SpdyFrameQueue();
+
+ // Return true if this queue has been aborted.
+ bool is_aborted() const;
+
+ // Abort the queue. All frames held by the queue will be deleted; future
+ // frames passed to Insert() will be immediately deleted; future calls to
+ // Pop() will fail immediately; and current blocking calls to Pop will
+ // immediately unblock and fail.
+ void Abort();
+
+ // Insert a frame into the queue. The queue takes ownership of the frame,
+ // and will delete it if the queue is deleted or aborted before the frame is
+ // removed from the queue by the Pop method.
+ void Insert(net::SpdyFrameIR* frame);
+
+ // Remove and provide a frame from the queue and return true, or return false
+ // if the queue is empty or has been aborted. If the block argument is true,
+ // block until a frame becomes available (or the queue is aborted). The
+ // caller gains ownership of the provided frame object.
+ bool Pop(bool block, net::SpdyFrameIR** frame);
+
+ private:
+ // This is a pretty naive implementation of a thread-safe queue, but it's
+ // good enough for our purposes. We could use an apr_queue_t instead of
+ // rolling our own class, but it lacks the ownership semantics that we want.
+ mutable base::Lock lock_;
+ base::ConditionVariable condvar_;
+ std::list<net::SpdyFrameIR*> queue_;
+ bool is_aborted_;
+
+ DISALLOW_COPY_AND_ASSIGN(SpdyFrameQueue);
+};
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_COMMON_SPDY_FRAME_QUEUE_H_
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue_test.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue_test.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_frame_queue_test.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,112 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_frame_queue.h"
+
+#include "base/basictypes.h"
+#include "base/threading/platform_thread.h"
+#include "mod_spdy/common/testing/async_task_runner.h"
+#include "mod_spdy/common/testing/notification.h"
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace {
+
+const int kSpdyVersion = 2;
+
+void ExpectPop(bool block, net::SpdyStreamId expected,
+ mod_spdy::SpdyFrameQueue* queue) {
+ net::SpdyFrameIR* raw_frame = NULL;
+ const bool success = queue->Pop(block, &raw_frame);
+ scoped_ptr<net::SpdyFrameIR> scoped_frame(raw_frame);
+ EXPECT_TRUE(success);
+ ASSERT_TRUE(scoped_frame != NULL);
+ EXPECT_THAT(*scoped_frame, mod_spdy::testing::IsPing(expected));
+}
+
+void ExpectEmpty(mod_spdy::SpdyFrameQueue* queue) {
+ net::SpdyFrameIR* frame = NULL;
+ EXPECT_FALSE(queue->Pop(false, &frame));
+ EXPECT_TRUE(frame == NULL);
+}
+
+TEST(SpdyFrameQueueTest, Simple) {
+ mod_spdy::SpdyFrameQueue queue;
+ ExpectEmpty(&queue);
+
+ queue.Insert(new net::SpdyPingIR(4));
+ queue.Insert(new net::SpdyPingIR(1));
+ queue.Insert(new net::SpdyPingIR(3));
+
+ ExpectPop(false, 4, &queue);
+ ExpectPop(false, 1, &queue);
+
+ queue.Insert(new net::SpdyPingIR(2));
+ queue.Insert(new net::SpdyPingIR(5));
+
+ ExpectPop(false, 3, &queue);
+ ExpectPop(false, 2, &queue);
+ ExpectPop(false, 5, &queue);
+ ExpectEmpty(&queue);
+}
+
+TEST(SpdyFrameQueueTest, AbortEmptiesQueue) {
+ mod_spdy::SpdyFrameQueue queue;
+ ASSERT_FALSE(queue.is_aborted());
+ ExpectEmpty(&queue);
+
+ queue.Insert(new net::SpdyPingIR(4));
+ queue.Insert(new net::SpdyPingIR(1));
+ queue.Insert(new net::SpdyPingIR(3));
+
+ ExpectPop(false, 4, &queue);
+
+ queue.Abort();
+
+ ExpectEmpty(&queue);
+ ASSERT_TRUE(queue.is_aborted());
+}
+
+class BlockingPopTask : public mod_spdy::testing::AsyncTaskRunner::Task {
+ public:
+ explicit BlockingPopTask(mod_spdy::SpdyFrameQueue* queue) : queue_(queue) {}
+ virtual void Run() { ExpectPop(true, 7, queue_); }
+ private:
+ mod_spdy::SpdyFrameQueue* const queue_;
+ DISALLOW_COPY_AND_ASSIGN(BlockingPopTask);
+};
+
+TEST(SpdyFrameQueueTest, BlockingPop) {
+ mod_spdy::SpdyFrameQueue queue;
+
+ // Start a task that will do a blocking pop from the queue.
+ mod_spdy::testing::AsyncTaskRunner runner(new BlockingPopTask(&queue));
+ ASSERT_TRUE(runner.Start());
+
+ // Even if we wait for a little bit, the task shouldn't complete, because
+ // that thread is blocked, because the queue is still empty.
+ base::PlatformThread::Sleep(base::TimeDelta::FromMilliseconds(50));
+ runner.notification()->ExpectNotSet();
+ ExpectEmpty(&queue);
+
+ // Now, if we push something into the queue, the task should soon unblock and
+ // complete, and the queue should then be empty.
+ queue.Insert(new net::SpdyPingIR(7));
+ runner.notification()->ExpectSetWithinMillis(100);
+ ExpectEmpty(&queue);
+}
+
+} // namespace
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,75 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_server_config.h"
+
+#include "mod_spdy/common/protocol_util.h"
+
+namespace {
+
+const bool kDefaultSpdyEnabled = false;
+const int kDefaultMaxStreamsPerConnection = 100;
+const int kDefaultMinThreadsPerProcess = 2;
+const int kDefaultMaxThreadsPerProcess = 10;
+const int kDefaultMaxServerPushDepth = 1;
+const bool kDefaultSendVersionHeader = true;
+const bool kDefaultServerPushDiscoveryEnabled = false;
+const bool kDefaultServerPushDiscoverySendDebugHeaders = false;
+const mod_spdy::spdy::SpdyVersion kDefaultUseSpdyVersionWithoutSsl =
+ mod_spdy::spdy::SPDY_VERSION_NONE;
+const int kDefaultVlogLevel = 0;
+
+} // namespace
+
+namespace mod_spdy {
+
+SpdyServerConfig::SpdyServerConfig()
+ : spdy_enabled_(kDefaultSpdyEnabled),
+ max_streams_per_connection_(kDefaultMaxStreamsPerConnection),
+ min_threads_per_process_(kDefaultMinThreadsPerProcess),
+ max_threads_per_process_(kDefaultMaxThreadsPerProcess),
+ max_server_push_depth_(kDefaultMaxServerPushDepth),
+ send_version_header_(kDefaultSendVersionHeader),
+ server_push_discovery_enabled_(kDefaultServerPushDiscoveryEnabled),
+ server_push_discovery_send_debug_headers_(
+ kDefaultServerPushDiscoverySendDebugHeaders),
+ use_spdy_version_without_ssl_(kDefaultUseSpdyVersionWithoutSsl),
+ vlog_level_(kDefaultVlogLevel) {}
+
+SpdyServerConfig::~SpdyServerConfig() {}
+
+void SpdyServerConfig::MergeFrom(const SpdyServerConfig& a,
+ const SpdyServerConfig& b) {
+ spdy_enabled_.MergeFrom(a.spdy_enabled_, b.spdy_enabled_);
+ max_streams_per_connection_.MergeFrom(a.max_streams_per_connection_,
+ b.max_streams_per_connection_);
+ min_threads_per_process_.MergeFrom(a.min_threads_per_process_,
+ b.min_threads_per_process_);
+ max_threads_per_process_.MergeFrom(a.max_threads_per_process_,
+ b.max_threads_per_process_);
+ max_server_push_depth_.MergeFrom(a.max_server_push_depth_,
+ b.max_server_push_depth_);
+ send_version_header_.MergeFrom(
+ a.send_version_header_, b.send_version_header_);
+ server_push_discovery_enabled_.MergeFrom(a.server_push_discovery_enabled_,
+ b.server_push_discovery_enabled_);
+ server_push_discovery_send_debug_headers_.MergeFrom(
+ a.server_push_discovery_send_debug_headers_,
+ b.server_push_discovery_send_debug_headers_);
+ use_spdy_version_without_ssl_.MergeFrom(
+ a.use_spdy_version_without_ssl_, b.use_spdy_version_without_ssl_);
+ vlog_level_.MergeFrom(a.vlog_level_, b.vlog_level_);
+}
+
+} // namespace mod_spdy
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h Thu May 1 11:39:27 2014
@@ -0,0 +1,139 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_SERVER_CONFIG_H_
+#define MOD_SPDY_COMMON_SPDY_SERVER_CONFIG_H_
+
+#include "base/basictypes.h"
+#include "mod_spdy/common/protocol_util.h"
+
+namespace mod_spdy {
+
+// Stores server configuration settings for our module.
+class SpdyServerConfig {
+ public:
+ SpdyServerConfig();
+ ~SpdyServerConfig();
+
+ // Return true if SPDY is enabled for this server, false otherwise.
+ bool spdy_enabled() const { return spdy_enabled_.get(); }
+
+ // Return the maximum number of simultaneous SPDY streams that should be
+ // permitted for a single client connection.
+ int max_streams_per_connection() const {
+ return max_streams_per_connection_.get();
+ }
+
+ // Return the minimum number of worker threads to spawn per child process.
+ int min_threads_per_process() const {
+ return min_threads_per_process_.get();
+ }
+
+ // Return the maximum number of worker threads to spawn per child process.
+ int max_threads_per_process() const {
+ return max_threads_per_process_.get();
+ }
+
+ // Return the maximum number of recursive levels to follow
+ // X-Associated-Content headers
+ int max_server_push_depth() const {
+ return max_server_push_depth_.get();
+ }
+
+ // Whether or not we should include an x-mod-spdy header with the module
+ // version number.
+ bool send_version_header() const { return send_version_header_.get(); }
+
+ // Return if SPDY server push discovery is enabled.
+ bool server_push_discovery_enabled() const {
+ return server_push_discovery_enabled_.get();
+ }
+
+ // Return if we should send server push discovery debug headers to user agent.
+ bool server_push_discovery_send_debug_headers() const {
+ return server_push_discovery_send_debug_headers_.get();
+ }
+
+ // If nonzero, assume (unencrypted) SPDY/x for non-SSL connections, where x
+ // is the version number returned here. This will most likely break normal
+ // browsers, but is useful for testing.
+ spdy::SpdyVersion use_spdy_version_without_ssl() const {
+ return use_spdy_version_without_ssl_.get();
+ }
+
+ // Return the maximum VLOG level we should use.
+ int vlog_level() const { return vlog_level_.get(); }
+
+ // Setters. Call only during the configuration phase.
+ void set_spdy_enabled(bool b) { spdy_enabled_.set(b); }
+ void set_max_streams_per_connection(int n) {
+ max_streams_per_connection_.set(n);
+ }
+ void set_min_threads_per_process(int n) { min_threads_per_process_.set(n); }
+ void set_max_threads_per_process(int n) { max_threads_per_process_.set(n); }
+ void set_max_server_push_depth(int n) { max_server_push_depth_.set(n); }
+ void set_send_version_header(bool b) { send_version_header_.set(b); }
+ void set_server_push_discovery_enabled(bool b) {
+ return server_push_discovery_enabled_.set(b);
+ }
+ void set_server_push_discovery_send_debug_headers(bool b) {
+ return server_push_discovery_send_debug_headers_.set(b);
+ }
+ void set_use_spdy_version_without_ssl(spdy::SpdyVersion v) {
+ use_spdy_version_without_ssl_.set(v);
+ }
+ void set_vlog_level(int n) { vlog_level_.set(n); }
+
+ // Set this config object to the merge of a and b. Call only during the
+ // configuration phase.
+ void MergeFrom(const SpdyServerConfig& a, const SpdyServerConfig& b);
+
+ private:
+ template <typename T>
+ class Option {
+ public:
+ explicit Option(const T& default_value)
+ : was_set_(false), value_(default_value) {}
+ const T& get() const { return value_; }
+ void set(const T& value) { was_set_ = true; value_ = value; }
+ void MergeFrom(const Option<T>& a, const Option<T>& b) {
+ was_set_ = a.was_set_ || b.was_set_;
+ value_ = a.was_set_ ? a.value_ : b.value_;
+ }
+ private:
+ bool was_set_;
+ T value_;
+ DISALLOW_COPY_AND_ASSIGN(Option);
+ };
+
+ // Configuration fields:
+ Option<bool> spdy_enabled_;
+ Option<int> max_streams_per_connection_;
+ Option<int> min_threads_per_process_;
+ Option<int> max_threads_per_process_;
+ Option<int> max_server_push_depth_;
+ Option<bool> send_version_header_;
+ Option<bool> server_push_discovery_enabled_;
+ Option<bool> server_push_discovery_send_debug_headers_;
+ Option<spdy::SpdyVersion> use_spdy_version_without_ssl_;
+ Option<int> vlog_level_;
+ // Note: Add more config options here as needed; be sure to also update the
+ // MergeFrom method in spdy_server_config.cc.
+
+ DISALLOW_COPY_AND_ASSIGN(SpdyServerConfig);
+};
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_CONTEXT_SPDY_SERVER_CONFIG_H_
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_config.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,23 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_server_push_interface.h"
+
+namespace mod_spdy {
+
+SpdyServerPushInterface::SpdyServerPushInterface() {}
+
+SpdyServerPushInterface::~SpdyServerPushInterface() {}
+
+} // namespace mod_spdy
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h Thu May 1 11:39:27 2014
@@ -0,0 +1,66 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_SERVER_PUSH_INTERFACE_H_
+#define MOD_SPDY_COMMON_SPDY_SERVER_PUSH_INTERFACE_H_
+
+#include "base/basictypes.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace mod_spdy {
+
+class SpdyServerPushInterface {
+ public:
+ SpdyServerPushInterface();
+ virtual ~SpdyServerPushInterface();
+
+ enum PushStatus {
+ // PUSH_STARTED: The server push was started successfully.
+ PUSH_STARTED,
+ // INVALID_REQUEST_HEADERS: The given request headers were invalid for a
+ // server push (e.g. because required headers were missing).
+ INVALID_REQUEST_HEADERS,
+ // ASSOCIATED_STREAM_INACTIVE: The push could not be started because the
+ // associated stream is not currently active.
+ ASSOCIATED_STREAM_INACTIVE,
+ // CANNOT_PUSH_EVER_AGAIN: We can't do any more pushes on this session,
+ // either because the client has already sent us a GOAWAY frame, or the
+ // session has been open so long that we've run out of stream IDs.
+ CANNOT_PUSH_EVER_AGAIN,
+ // TOO_MANY_CONCURRENT_PUSHES: The push could not be started right now
+ // because there are too many currently active push streams.
+ TOO_MANY_CONCURRENT_PUSHES,
+ // PUSH_INTERNAL_ERROR: There was an internal error in the SpdySession
+ // (typically something that caused a LOG(DFATAL).
+ PUSH_INTERNAL_ERROR,
+ };
+
+ // Initiate a SPDY server push, roughly by pretending that the client sent a
+ // SYN_STREAM with the given headers. To repeat: the headers argument is
+ // _not_ the headers that the server will send to the client, but rather the
+ // headers to _pretend_ that the client sent to the server.
+ virtual PushStatus StartServerPush(
+ net::SpdyStreamId associated_stream_id,
+ int32 server_push_depth,
+ net::SpdyPriority priority,
+ const net::SpdyHeaderBlock& request_headers) = 0;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(SpdyServerPushInterface);
+};
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_COMMON_SPDY_SERVER_PUSH_INTERFACE_H_
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_server_push_interface.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,914 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_session.h"
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/lock.h"
+#include "base/time/time.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/spdy_server_config.h"
+#include "mod_spdy/common/spdy_session_io.h"
+#include "mod_spdy/common/spdy_stream.h"
+#include "mod_spdy/common/spdy_stream_task_factory.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace {
+
+// Server push stream IDs must be even, and must fit in 31 bits (SPDY draft 3
+// section 2.3.2). Thus, this is the largest stream ID we can ever use for a
+// pushed stream.
+const net::SpdyStreamId kMaxServerPushStreamId = 0x7FFFFFFEu;
+
+// Until the client informs us otherwise, we will assume a limit of 100 open
+// push streams at a time.
+const uint32 kInitMaxConcurrentPushes = 100u;
+
+} // namespace
+
+namespace mod_spdy {
+
+SpdySession::SpdySession(spdy::SpdyVersion spdy_version,
+ const SpdyServerConfig* config,
+ SpdySessionIO* session_io,
+ SpdyStreamTaskFactory* task_factory,
+ Executor* executor)
+ : spdy_version_(spdy_version),
+ config_(config),
+ session_io_(session_io),
+ task_factory_(task_factory),
+ executor_(executor),
+ framer_(SpdyVersionToFramerVersion(spdy_version), true),
+ session_stopped_(false),
+ already_sent_goaway_(false),
+ last_client_stream_id_(0u),
+ initial_window_size_(net::kSpdyStreamInitialWindowSize),
+ max_concurrent_pushes_(kInitMaxConcurrentPushes),
+ last_server_push_stream_id_(0u),
+ received_goaway_(false),
+ shared_window_(net::kSpdyStreamInitialWindowSize,
+ net::kSpdyStreamInitialWindowSize) {
+ DCHECK_NE(spdy::SPDY_VERSION_NONE, spdy_version);
+ framer_.set_visitor(this);
+}
+
+SpdySession::~SpdySession() {}
+
+int32 SpdySession::current_shared_input_window_size() const {
+ DCHECK_GE(spdy_version_, spdy::SPDY_VERSION_3_1);
+ return shared_window_.current_input_window_size();
+}
+
+int32 SpdySession::current_shared_output_window_size() const {
+ DCHECK_GE(spdy_version_, spdy::SPDY_VERSION_3_1);
+ return shared_window_.current_output_window_size();
+}
+
+void SpdySession::Run() {
+ // Send a SETTINGS frame when the connection first opens, to inform the
+ // client of our MAX_CONCURRENT_STREAMS limit.
+ SendSettingsFrame();
+
+ // Initial amount time to block when waiting for output -- we start with
+ // this, and as long as we fail to perform any input OR output, we increase
+ // exponentially to the max, resetting when we succeed again.
+ const base::TimeDelta kInitOutputBlockTime =
+ base::TimeDelta::FromMilliseconds(1);
+ // Maximum time to block when waiting for output.
+ const base::TimeDelta kMaxOutputBlockTime =
+ base::TimeDelta::FromMilliseconds(30);
+
+ base::TimeDelta output_block_time = kInitOutputBlockTime;
+
+ // Until we stop the session, or it is aborted by the client, alternate
+ // between reading input from the client and (compressing and) sending output
+ // frames that our stream threads have posted to the output queue. This
+ // basically amounts to a busy-loop, switching back and forth between input
+ // and output, so we do our best to block when we can. It would be far nicer
+ // to have separate threads for input and output and have them always block;
+ // unfortunately, we cannot do that, because in Apache the input and output
+ // filter chains for a connection must be invoked by the same thread.
+ while (!session_stopped_) {
+ if (session_io_->IsConnectionAborted()) {
+ LOG(WARNING) << "Master connection was aborted.";
+ StopSession();
+ break;
+ }
+
+ // Step 1: Read input from the client.
+ {
+ // Determine whether we should block until more input data is available.
+ // For now, our policy is to block only if there is no pending output and
+ // there are no currently-active streams (which might produce new
+ // output).
+ const bool should_block = StreamMapIsEmpty() && output_queue_.IsEmpty();
+
+ // If there's no current output, and we can't create new streams (so
+ // there will be no future output), then we should just shut down the
+ // connection.
+ if (should_block && already_sent_goaway_) {
+ StopSession();
+ break;
+ }
+
+ // Read available input data. The SpdySessionIO will grab any
+ // available data and push it into the SpdyFramer that we pass to it
+ // here; the SpdyFramer, in turn, will call our OnControl and/or
+ // OnStreamFrameData methods to report decoded frames. If no input data
+ // is currently available and should_block is true, this will block until
+ // input becomes available (or the connection is closed).
+ const SpdySessionIO::ReadStatus status =
+ session_io_->ProcessAvailableInput(should_block, &framer_);
+ if (status == SpdySessionIO::READ_SUCCESS) {
+ // We successfully did some I/O, so reset the output block timeout.
+ output_block_time = kInitOutputBlockTime;
+ } else if (status == SpdySessionIO::READ_CONNECTION_CLOSED) {
+ // The reading side of the connection has closed, so we won't be
+ // reading anything more. SPDY is transport-layer agnostic and not
+ // TCP-specific; apparently, this means that there is no expectation
+ // that we behave any differently for a half-closed connection than for
+ // a fully-closed connection. So if the reading side of the connection
+ // closes, we're just going to shut down completely.
+ //
+ // But just in case the writing side is still open, let's try to send a
+ // GOAWAY to let the client know we're shutting down gracefully.
+ SendGoAwayFrame(net::GOAWAY_OK);
+ // Now, shut everything down.
+ StopSession();
+ } else if (status == SpdySessionIO::READ_ERROR) {
+ // There was an error during reading, so the session is corrupted and
+ // we have no chance of reading anything more.
+ //
+ // We've probably already sent a GOAWAY with a PROTOCOL_ERROR by this
+ // point, but if we haven't (perhaps the error was our fault?) then
+ // send a GOAWAY now. (If we've already sent a GOAWAY, then
+ // SendGoAwayFrame is a no-op.)
+ SendGoAwayFrame(net::GOAWAY_INTERNAL_ERROR);
+ // Now, shut everything down.
+ StopSession();
+ } else {
+ // Otherwise, there's simply no data available at the moment.
+ DCHECK_EQ(SpdySessionIO::READ_NO_DATA, status);
+ }
+ }
+
+ // Step 2: Send output to the client.
+ if (!session_stopped_) {
+ // If there are no active streams, then no new output can be getting
+ // created right now, so we shouldn't block on output waiting for more.
+ const bool no_active_streams = StreamMapIsEmpty();
+
+ // Send any pending output, one frame at a time. If there are any active
+ // streams, we're willing to block briefly to wait for more frames to
+ // send, if only to prevent this loop from busy-waiting too heavily --
+ // not a great solution, but better than nothing for now.
+ net::SpdyFrameIR* frame = NULL;
+ if (no_active_streams ? output_queue_.Pop(&frame) :
+ output_queue_.BlockingPop(output_block_time, &frame)) {
+ do {
+ SendFrame(frame);
+ } while (!session_stopped_ && output_queue_.Pop(&frame));
+
+ // We successfully did some I/O, so reset the output block timeout.
+ output_block_time = kInitOutputBlockTime;
+ } else {
+ // The queue is currently empty; if no more streams can be created and
+ // no more remain, we're done.
+ if (already_sent_goaway_ && no_active_streams) {
+ StopSession();
+ } else {
+ // There were no output frames within the timeout; so do an
+ // exponential backoff by doubling output_block_time.
+ output_block_time = std::min(kMaxOutputBlockTime,
+ output_block_time * 2);
+ }
+ }
+ }
+
+ // TODO(mdsteele): What we really want to be able to do is to block until
+ // *either* more input or more output is available. Unfortunely, there's
+ // no good way to query the input side (in Apache). One possibility would
+ // be to grab the input socket object (which we can do), and then arrange
+ // to block until either the socket is ready to read OR our output queue is
+ // nonempty (obviously we would abstract that away in SpdySessionIO),
+ // but there's not even a nice way to do that (that I know of).
+ }
+}
+
+SpdyServerPushInterface::PushStatus SpdySession::StartServerPush(
+ net::SpdyStreamId associated_stream_id,
+ int32 server_push_depth,
+ net::SpdyPriority priority,
+ const net::SpdyHeaderBlock& request_headers) {
+ // Server push is pretty ill-defined in SPDY v2, so we require v3 or higher.
+ DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+
+ // Grab the headers that we are required to send with the initial SYN_STREAM.
+ const net::SpdyHeaderBlock::const_iterator host_iter =
+ request_headers.find(spdy::kSpdy3Host);
+ const net::SpdyHeaderBlock::const_iterator path_iter =
+ request_headers.find(spdy::kSpdy3Path);
+ const net::SpdyHeaderBlock::const_iterator scheme_iter =
+ request_headers.find(spdy::kSpdy3Scheme);
+ if (host_iter == request_headers.end() ||
+ path_iter == request_headers.end() ||
+ scheme_iter == request_headers.end()) {
+ return SpdyServerPushInterface::INVALID_REQUEST_HEADERS;
+ }
+ const std::string& host_header = host_iter->second;
+ const std::string& path_header = path_iter->second;
+ const std::string& scheme_header = scheme_iter->second;
+
+ StreamTaskWrapper* task_wrapper = NULL;
+ {
+ base::AutoLock autolock(stream_map_lock_);
+
+ // If we've received a GOAWAY frame the client, we shouldn't create any new
+ // streams on this session (SPDY draft 3 section 2.6.6).
+ if (received_goaway_) {
+ return SpdyServerPushInterface::CANNOT_PUSH_EVER_AGAIN;
+ }
+
+ // The associated stream must be active (SPDY draft 3 section 3.3.1).
+ if (!stream_map_.IsStreamActive(associated_stream_id)) {
+ return SpdyServerPushInterface::ASSOCIATED_STREAM_INACTIVE;
+ }
+
+ // Check if we're allowed to create new push streams right now (based on
+ // the client SETTINGS_MAX_CONCURRENT_STREAMS). Note that the number of
+ // active push streams might be (temporarily) greater than the max, if the
+ // client lowered the max after we already started a bunch of pushes.
+ if (stream_map_.NumActivePushStreams() >= max_concurrent_pushes_) {
+ return SpdyServerPushInterface::TOO_MANY_CONCURRENT_PUSHES;
+ }
+
+ // In the unlikely event that the session stays open so long that we run
+ // out of server push stream IDs, we may not do any more pushes on this
+ // session (SPDY draft 3 section 2.3.2).
+ DCHECK_LE(last_server_push_stream_id_, kMaxServerPushStreamId);
+ if (last_server_push_stream_id_ >= kMaxServerPushStreamId) {
+ return SpdyServerPushInterface::CANNOT_PUSH_EVER_AGAIN;
+ }
+ // Server push stream IDs must be even (SPDY draft 3 section 2.3.2). So
+ // each time we do a push, we increment last_server_push_stream_id_ by two.
+ DCHECK_EQ(last_server_push_stream_id_ % 2u, 0u);
+ last_server_push_stream_id_ += 2u;
+ const net::SpdyStreamId stream_id = last_server_push_stream_id_;
+ // Only the server can create even stream IDs, and we never use the same
+ // one twice, so our chosen stream_id should definitely not be in use.
+ if (stream_map_.IsStreamActive(stream_id)) {
+ LOG(DFATAL) << "Next server push stream ID already in use: "
+ << stream_id;
+ return SpdyServerPushInterface::PUSH_INTERNAL_ERROR;
+ }
+
+ // Create task and add it to the stream map.
+ task_wrapper = new StreamTaskWrapper(
+ this, stream_id, associated_stream_id, server_push_depth, priority);
+ stream_map_.AddStreamTask(task_wrapper);
+ net::SpdySynStreamIR* frame = new net::SpdySynStreamIR(stream_id);
+ frame->set_associated_to_stream_id(associated_stream_id);
+ frame->set_priority(priority);
+ frame->set_fin(true);
+ frame->GetMutableNameValueBlock()->insert(
+ request_headers.begin(), request_headers.end());
+ task_wrapper->stream()->PostInputFrame(frame);
+
+ // Send initial SYN_STREAM to the client. It only needs to contain the
+ // ":host", ":path", and ":scheme" headers; the rest can follow in a later
+ // HEADERS frame (SPDY draft 3 section 3.3.1).
+ net::SpdyHeaderBlock initial_response_headers;
+ initial_response_headers[spdy::kSpdy3Host] = host_header;
+ initial_response_headers[spdy::kSpdy3Path] = path_header;
+ initial_response_headers[spdy::kSpdy3Scheme] = scheme_header;
+ task_wrapper->stream()->SendOutputSynStream(
+ initial_response_headers, false);
+
+ VLOG(2) << "Starting server push; opening stream " << stream_id;
+ }
+ if (task_wrapper == NULL) {
+ LOG(DFATAL) << "Can't happen: task_wrapper is NULL";
+ return SpdyServerPushInterface::PUSH_INTERNAL_ERROR;
+ }
+ executor_->AddTask(task_wrapper, priority);
+ return SpdyServerPushInterface::PUSH_STARTED;
+}
+
+void SpdySession::OnError(net::SpdyFramer::SpdyError error_code) {
+ LOG(ERROR) << "Session error: "
+ << net::SpdyFramer::ErrorCodeToString(error_code);
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+}
+
+void SpdySession::OnStreamError(net::SpdyStreamId stream_id,
+ const std::string& description) {
+ LOG(ERROR) << "Stream " << stream_id << " error: " << description;
+ AbortStream(stream_id, net::RST_STREAM_PROTOCOL_ERROR);
+}
+
+void SpdySession::OnStreamFrameData(
+ net::SpdyStreamId stream_id, const char* data, size_t length, bool fin) {
+ // First check the shared input flow control window (for SPDY/3.1 and up).
+ if (spdy_version_ >= spdy::SPDY_VERSION_3_1) {
+ if (!shared_window_.OnReceiveInputData(length)) {
+ LOG(ERROR) << "Client violated flow control by sending too much data "
+ << "to session. Sending GOAWAY.";
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ StopSession();
+ return;
+ }
+ }
+
+ // Look up the stream to post the data to. We need to lock when reading the
+ // stream map, because one of the stream threads could call
+ // RemoveStreamTask() at any time.
+ {
+ base::AutoLock autolock(stream_map_lock_);
+ SpdyStream* stream = stream_map_.GetStream(stream_id);
+ if (stream != NULL) {
+ VLOG(4) << "[stream " << stream_id << "] Received DATA (length="
+ << length << ")";
+ // Copy the data into an _uncompressed_ SPDY data frame and post it to
+ // the stream's input queue.
+ // Note that we must still be holding stream_map_lock_ when we call this
+ // method -- otherwise the stream may be deleted out from under us by the
+ // StreamTaskWrapper destructor. That's okay -- PostInputFrame is a
+ // quick operation and won't block (for any appreciable length of time).
+ net::SpdyDataIR* frame =
+ new net::SpdyDataIR(stream_id, base::StringPiece(data, length));
+ frame->set_fin(fin);
+ stream->PostInputFrame(frame);
+ return;
+ }
+ }
+
+ // If we reach this point, it means that the client has sent us DATA for a
+ // stream that doesn't exist (possibly because it used to exist but has
+ // already been closed by a FLAG_FIN); *unless* length=0, which is just the
+ // BufferedSpdyFramer's way of telling us that there will be no more data on
+ // this stream (i.e. because a FLAG_FIN has been received, possibly on a
+ // previous control frame).
+
+ // TODO(mdsteele): The BufferedSpdyFramer sends us OnStreamFrameData with
+ // length=0 to indicate end-of-stream, but it will do this even if we already
+ // got FLAG_FIN in a control frame (such as SYN_STREAM). For now, we fix
+ // this issue by simply ignoring length=0 data for streams that no longer
+ // exist. Once we transition to the new plain SpdyFramer, we'll be able to
+ // handle this more precisely.
+ if (length == 0) {
+ return;
+ }
+
+ // If the client sends data for a nonexistant stream, we must send a
+ // RST_STREAM frame with error code INVALID_STREAM (SPDY draft 2 section
+ // 2.4). Note that we release the mutex *before* sending the frame.
+ LOG(WARNING) << "Client sent DATA (length=" << length
+ << ") for nonexistant stream " << stream_id;
+ SendRstStreamFrame(stream_id, net::RST_STREAM_INVALID_STREAM);
+}
+
+void SpdySession::OnSynStream(
+ net::SpdyStreamId stream_id,
+ net::SpdyStreamId associated_stream_id,
+ net::SpdyPriority priority,
+ uint8 credential_slot,
+ bool fin,
+ bool unidirectional,
+ const net::SpdyHeaderBlock& headers) {
+ // The SPDY spec requires us to ignore SYN_STREAM frames after sending a
+ // GOAWAY frame (SPDY draft 3 section 2.6.6).
+ if (already_sent_goaway_) {
+ return;
+ }
+
+ // Client stream IDs must be odd-numbered.
+ if (stream_id % 2 == 0) {
+ LOG(WARNING) << "Client sent SYN_STREAM for even stream ID (" << stream_id
+ << "). Sending GOAWAY.";
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ return;
+ }
+
+ // Client stream IDs must be strictly increasing (SPDY draft 2 section
+ // 2.5.1).
+ if (stream_id <= last_client_stream_id_) {
+ LOG(WARNING) << "Client sent SYN_STREAM for non-increasing stream ID ("
+ << stream_id << " after " << last_client_stream_id_
+ << ")."; // Aborting stream.";
+#if 0
+ // TODO(mdsteele): re-enable this code block when
+ // http://code.google.com/p/chromium/issues/detail?id=111708 is
+ // fixed.
+ AbortStream(stream_id, net::PROTOCOL_ERROR);
+ return;
+#endif
+ }
+
+ StreamTaskWrapper* task_wrapper = NULL;
+ {
+ // Lock the stream map before we start checking its size or adding a new
+ // stream to it. We need to lock when touching the stream map, because one
+ // of the stream threads could call RemoveStreamTask() at any time.
+ base::AutoLock autolock(stream_map_lock_);
+
+#if 0
+ // TODO(mdsteele): re-enable this code block when
+ // http://code.google.com/p/chromium/issues/detail?id=111708 is
+ // fixed.
+
+ // We already checked that stream_id > last_client_stream_id_, so there
+ // definitely shouldn't already be a stream with this ID in the map.
+ DCHECK(!stream_map_.IsStreamActive(stream_id));
+#else
+ if (stream_map_.IsStreamActive(stream_id)) {
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ return;
+ }
+#endif
+
+ // Limit the number of simultaneous open streams the client can create;
+ // refuse the stream if there are too many currently active (non-push)
+ // streams.
+ if (static_cast<int>(stream_map_.NumActiveClientStreams()) >=
+ config_->max_streams_per_connection()) {
+ SendRstStreamFrame(stream_id, net::RST_STREAM_REFUSED_STREAM);
+ return;
+ }
+
+ // Initiate a new stream.
+ last_client_stream_id_ = std::max(last_client_stream_id_, stream_id);
+ task_wrapper = new StreamTaskWrapper(
+ this, stream_id, associated_stream_id,
+ 0, // server_push_depth = 0
+ priority);
+ stream_map_.AddStreamTask(task_wrapper);
+ net::SpdySynStreamIR* frame = new net::SpdySynStreamIR(stream_id);
+ frame->set_associated_to_stream_id(associated_stream_id);
+ frame->set_priority(priority);
+ frame->set_slot(credential_slot);
+ frame->set_fin(fin);
+ frame->set_unidirectional(unidirectional);
+ frame->GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+ task_wrapper->stream()->PostInputFrame(frame);
+ }
+ DCHECK(task_wrapper);
+ // Release the lock before adding the task to the executor. This is mostly
+ // for the benefit of unit tests, for which calling AddTask will execute the
+ // task immediately (and we don't want to be holding the lock when that
+ // happens). Note that it's safe for us to pass task_wrapper here without
+ // holding the lock, because the task won't get deleted before it's been
+ // added to the executor.
+ VLOG(2) << "Received SYN_STREAM; opening stream " << stream_id;
+ executor_->AddTask(task_wrapper, priority);
+}
+
+void SpdySession::OnSynReply(net::SpdyStreamId stream_id,
+ bool fin,
+ const net::SpdyHeaderBlock& headers) {
+ // TODO(mdsteele)
+}
+
+void SpdySession::OnRstStream(net::SpdyStreamId stream_id,
+ net::SpdyRstStreamStatus status) {
+ switch (status) {
+ // These are totally benign reasons to abort a stream, so just abort the
+ // stream without a fuss.
+ case net::RST_STREAM_REFUSED_STREAM:
+ case net::RST_STREAM_CANCEL:
+ VLOG(2) << "Client cancelled/refused stream " << stream_id;
+ AbortStreamSilently(stream_id);
+ break;
+ // If there was an error, abort the stream, but log a warning first.
+ // TODO(mdsteele): Should we have special behavior for different kinds of
+ // errors?
+ default:
+ LOG(WARNING) << "Client sent RST_STREAM with "
+ << RstStreamStatusCodeToString(status)
+ << " for stream " << stream_id << ". Aborting stream.";
+ AbortStreamSilently(stream_id);
+ break;
+ }
+}
+
+void SpdySession::OnSettings(bool clear_persisted) {
+ // Do nothing; we never persist values, so we don't need to pay attention to
+ // this flag.
+}
+
+void SpdySession::OnSetting(net::SpdySettingsIds id,
+ uint8 flags, uint32 value) {
+ VLOG(4) << "Received SETTING (flags=" << flags << "): "
+ << SettingsIdToString(id) << "=" << value;
+ switch (id) {
+ case net::SETTINGS_MAX_CONCURRENT_STREAMS:
+ max_concurrent_pushes_ = value;
+ break;
+ case net::SETTINGS_INITIAL_WINDOW_SIZE:
+ // Flow control only exists for SPDY v3 and up.
+ if (spdy_version() < spdy::SPDY_VERSION_3) {
+ LOG(ERROR) << "Client sent INITIAL_WINDOW_SIZE setting over "
+ << "SPDY/" << SpdyVersionNumberString(spdy_version())
+ << ". Sending GOAWAY.";
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ } else {
+ SetInitialWindowSize(value);
+ }
+ break;
+ case net::SETTINGS_UPLOAD_BANDWIDTH:
+ case net::SETTINGS_DOWNLOAD_BANDWIDTH:
+ case net::SETTINGS_ROUND_TRIP_TIME:
+ case net::SETTINGS_CURRENT_CWND:
+ case net::SETTINGS_DOWNLOAD_RETRANS_RATE:
+ // Ignore other settings for now.
+ break;
+ default:
+ LOG(ERROR) << "Client sent invalid SETTINGS id (" << id
+ << "). Sending GOAWAY.";
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ break;
+ }
+}
+
+void SpdySession::OnPing(uint32 unique_id) {
+ VLOG(4) << "Received PING frame (id=" << unique_id << ")";
+ // The SPDY spec requires the server to ignore even-numbered PING frames that
+ // it did not initiate (SPDY draft 3 section 2.6.5), and right now, we never
+ // initiate pings.
+ if (unique_id % 2 == 0) {
+ return;
+ }
+
+ // Any odd-numbered PING frame we receive was initiated by the client, and
+ // should be echoed back _immediately_ (SPDY draft 2 section 2.7.6).
+ SendFrame(new net::SpdyPingIR(unique_id));
+}
+
+void SpdySession::OnGoAway(net::SpdyStreamId last_accepted_stream_id,
+ net::SpdyGoAwayStatus status) {
+ VLOG(4) << "Received GOAWAY frame (status="
+ << GoAwayStatusCodeToString(status) << ", last_accepted_stream_id="
+ << last_accepted_stream_id << ")";
+
+ // Take note that we have received a GOAWAY frame; we should not start any
+ // new server push streams on this session.
+ {
+ base::AutoLock autolock(stream_map_lock_);
+ received_goaway_ = true;
+ }
+
+ // If this was not a normal shutdown (GOAWAY_OK), we should probably log a
+ // warning to let the user know something's up.
+ switch (status) {
+ case net::GOAWAY_OK:
+ break;
+ case net::GOAWAY_PROTOCOL_ERROR:
+ LOG(WARNING) << "Client sent GOAWAY with PROTOCOL_ERROR. Possibly we "
+ << "did something wrong?";
+ break;
+ case net::GOAWAY_INTERNAL_ERROR:
+ LOG(WARNING) << "Client sent GOAWAY with INTERNAL_ERROR. Apparently "
+ << "they're broken?";
+ break;
+ default:
+ LOG(ERROR) << "Client sent GOAWAY with invalid status code ("
+ << status << "). Sending GOAWAY.";
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ break;
+ }
+}
+
+void SpdySession::OnHeaders(net::SpdyStreamId stream_id,
+ bool fin,
+ const net::SpdyHeaderBlock& headers) {
+ // Look up the stream to post the data to. We need to lock when reading the
+ // stream map, because one of the stream threads could call
+ // RemoveStreamTask() at any time.
+ {
+ // TODO(mdsteele): This is pretty similar to the code in OnStreamFrameData.
+ // Maybe we can factor it out?
+ base::AutoLock autolock(stream_map_lock_);
+ SpdyStream* stream = stream_map_.GetStream(stream_id);
+ if (stream != NULL) {
+ VLOG(4) << "[stream " << stream_id << "] Received HEADERS frame";
+ net::SpdySynStreamIR* frame = new net::SpdySynStreamIR(stream_id);
+ frame->set_fin(true);
+ frame->GetMutableNameValueBlock()->insert(
+ headers.begin(), headers.end());
+ stream->PostInputFrame(frame);
+ return;
+ }
+ }
+
+ // Note that we release the mutex *before* sending the frame.
+ LOG(WARNING) << "Client sent HEADERS for nonexistant stream " << stream_id;
+ SendRstStreamFrame(stream_id, net::RST_STREAM_INVALID_STREAM);
+}
+
+void SpdySession::OnPushPromise(net::SpdyStreamId stream_id,
+ net::SpdyStreamId promised_stream_id) {
+ LOG(ERROR) << "Got a PUSH_PROMISE(" << stream_id << ", "
+ << promised_stream_id << ") frame from the client.";
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+}
+
+void SpdySession::OnWindowUpdate(net::SpdyStreamId stream_id,
+ uint32 delta_window_size) {
+ // Flow control only exists for SPDY/3 and up.
+ if (spdy_version() < spdy::SPDY_VERSION_3) {
+ LOG(ERROR) << "Got a WINDOW_UPDATE frame over SPDY/"
+ << SpdyVersionNumberString(spdy_version());
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ return;
+ }
+
+ // Stream zero is special; starting in SPDY/3.1, it represents the
+ // session-wide flow control window. For previous versions, it is invalid.
+ if (stream_id == 0) {
+ if (spdy_version() >= spdy::SPDY_VERSION_3_1) {
+ if (!shared_window_.IncreaseOutputWindowSize(delta_window_size)) {
+ LOG(ERROR) << "Got a WINDOW_UPDATE frame that overflows session "
+ << "window. Sending GOAWAY.";
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ StopSession();
+ }
+ } else {
+ LOG(ERROR) << "Got a WINDOW_UPDATE frame for stream 0 over SPDY/"
+ << SpdyVersionNumberString(spdy_version());
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ StopSession();
+ }
+ return;
+ }
+
+ base::AutoLock autolock(stream_map_lock_);
+ SpdyStream* stream = stream_map_.GetStream(stream_id);
+ if (stream == NULL) {
+ // We must ignore WINDOW_UPDATE frames for closed streams (SPDY draft 3
+ // section 2.6.8).
+ return;
+ }
+
+ VLOG(4) << "[stream " << stream_id << "] Received WINDOW_UPDATE("
+ << delta_window_size << ") frame";
+ stream->AdjustOutputWindowSize(delta_window_size);
+}
+
+void SpdySession::SetInitialWindowSize(uint32 new_init_window_size) {
+ // Flow control only exists for SPDY v3 and up. We shouldn't be calling this
+ // method for SPDY v2.
+ if (spdy_version() < spdy::SPDY_VERSION_3) {
+ LOG(DFATAL) << "SetInitialWindowSize called for SPDY/"
+ << SpdyVersionNumberString(spdy_version());
+ return;
+ }
+
+ // Validate the new window size; it must be positive, but at most int32max.
+ if (new_init_window_size == 0 ||
+ new_init_window_size >
+ static_cast<uint32>(net::kSpdyMaximumWindowSize)) {
+ LOG(WARNING) << "Client sent invalid init window size ("
+ << new_init_window_size << "). Sending GOAWAY.";
+ SendGoAwayFrame(net::GOAWAY_PROTOCOL_ERROR);
+ return;
+ }
+ // Sanity check that our current init window size is positive. It's a signed
+ // int32, so we know it's no more than int32max.
+ DCHECK_GT(initial_window_size_, 0);
+ // We can now be sure that this subtraction won't overflow/underflow.
+ const int32 delta =
+ static_cast<int32>(new_init_window_size) - initial_window_size_;
+
+ // Set the initial window size for new streams.
+ initial_window_size_ = new_init_window_size;
+ // We also have to adjust the window size of all currently active streams by
+ // the delta (SPDY draft 3 section 2.6.8).
+ base::AutoLock autolock(stream_map_lock_);
+ stream_map_.AdjustAllOutputWindowSizes(delta);
+}
+
+// Compress (if necessary), send, and then delete the given frame object.
+void SpdySession::SendFrame(const net::SpdyFrameIR* frame_ptr) {
+ scoped_ptr<const net::SpdyFrameIR> frame(frame_ptr);
+ scoped_ptr<const net::SpdySerializedFrame> serialized_frame(
+ framer_.SerializeFrame(*frame));
+ if (serialized_frame == NULL) {
+ LOG(DFATAL) << "frame compression failed";
+ StopSession();
+ return;
+ }
+ SendFrameRaw(*serialized_frame);
+}
+
+void SpdySession::SendFrameRaw(const net::SpdySerializedFrame& frame) {
+ const SpdySessionIO::WriteStatus status = session_io_->SendFrameRaw(frame);
+ if (status == SpdySessionIO::WRITE_CONNECTION_CLOSED) {
+ // If the connection was closed and we can't write anything to the client
+ // anymore, then there's little point in continuing with the session.
+ StopSession();
+ } else {
+ DCHECK_EQ(SpdySessionIO::WRITE_SUCCESS, status);
+ }
+}
+
+void SpdySession::SendGoAwayFrame(net::SpdyGoAwayStatus status) {
+ if (!already_sent_goaway_) {
+ already_sent_goaway_ = true;
+ SendFrame(new net::SpdyGoAwayIR(last_client_stream_id_, status));
+ }
+}
+
+void SpdySession::SendRstStreamFrame(net::SpdyStreamId stream_id,
+ net::SpdyRstStreamStatus status) {
+ output_queue_.Insert(SpdyFramePriorityQueue::kTopPriority,
+ new net::SpdyRstStreamIR(stream_id, status));
+}
+
+void SpdySession::SendSettingsFrame() {
+ scoped_ptr<net::SpdySettingsIR> settings(new net::SpdySettingsIR);
+ settings->AddSetting(net::SETTINGS_MAX_CONCURRENT_STREAMS,
+ false, false, config_->max_streams_per_connection());
+ SendFrame(settings.release());
+}
+
+void SpdySession::StopSession() {
+ session_stopped_ = true;
+ // Abort all remaining streams. We need to lock when reading the stream
+ // map, because one of the stream threads could call RemoveStreamTask() at
+ // any time.
+ {
+ base::AutoLock autolock(stream_map_lock_);
+ stream_map_.AbortAllSilently();
+ }
+ shared_window_.Abort();
+ // Stop all stream threads and tasks for this SPDY session. This will
+ // block until all currently running stream tasks have exited, but since we
+ // just aborted all streams, that should hopefully happen fairly soon. Note
+ // that we must release the lock before calling this, because each stream
+ // will remove itself from the stream map as it shuts down.
+ executor_->Stop();
+}
+
+// Abort the stream without sending anything to the client.
+void SpdySession::AbortStreamSilently(net::SpdyStreamId stream_id) {
+ // We need to lock when reading the stream map, because one of the stream
+ // threads could call RemoveStreamTask() at any time.
+ base::AutoLock autolock(stream_map_lock_);
+ SpdyStream* stream = stream_map_.GetStream(stream_id);
+ if (stream != NULL) {
+ stream->AbortSilently();
+ }
+}
+
+// Send a RST_STREAM frame and then abort the stream.
+void SpdySession::AbortStream(net::SpdyStreamId stream_id,
+ net::SpdyRstStreamStatus status) {
+ SendRstStreamFrame(stream_id, status);
+ AbortStreamSilently(stream_id);
+}
+
+// Remove the StreamTaskWrapper from the stream map. This is the only method
+// of SpdySession that is ever called by another thread (specifically, it is
+// called by the StreamTaskWrapper destructor, which is called by the executor,
+// which presumably uses worker threads) -- it is because of this that we must
+// lock the stream_map_lock_ whenever we touch the stream map or its contents.
+void SpdySession::RemoveStreamTask(StreamTaskWrapper* task_wrapper) {
+ // We need to lock when touching the stream map, in case the main connection
+ // thread is currently in the middle of reading the stream map.
+ base::AutoLock autolock(stream_map_lock_);
+ VLOG(2) << "Closing stream " << task_wrapper->stream()->stream_id();
+ stream_map_.RemoveStreamTask(task_wrapper);
+}
+
+bool SpdySession::StreamMapIsEmpty() {
+ base::AutoLock autolock(stream_map_lock_);
+ return stream_map_.IsEmpty();
+}
+
+// This constructor is always called by the main connection thread, so we're
+// safe to call spdy_session_->task_factory_->NewStreamTask(). However,
+// the other methods of this class (Run(), Cancel(), and the destructor) are
+// liable to be called from other threads by the executor.
+SpdySession::StreamTaskWrapper::StreamTaskWrapper(
+ SpdySession* spdy_session,
+ net::SpdyStreamId stream_id,
+ net::SpdyStreamId associated_stream_id,
+ int32 server_push_depth,
+ net::SpdyPriority priority)
+ : spdy_session_(spdy_session),
+ stream_(spdy_session->spdy_version(), stream_id, associated_stream_id,
+ server_push_depth, priority, spdy_session_->initial_window_size_,
+ &spdy_session_->output_queue_, &spdy_session_->shared_window_,
+ spdy_session_),
+ subtask_(spdy_session_->task_factory_->NewStreamTask(&stream_)) {
+ CHECK(subtask_);
+}
+
+SpdySession::StreamTaskWrapper::~StreamTaskWrapper() {
+ // Remove this object from the SpdySession's stream map.
+ spdy_session_->RemoveStreamTask(this);
+}
+
+void SpdySession::StreamTaskWrapper::Run() {
+ subtask_->CallRun();
+}
+
+void SpdySession::StreamTaskWrapper::Cancel() {
+ subtask_->CallCancel();
+}
+
+SpdySession::SpdyStreamMap::SpdyStreamMap()
+ : num_active_push_streams_(0u) {}
+
+SpdySession::SpdyStreamMap::~SpdyStreamMap() {}
+
+bool SpdySession::SpdyStreamMap::IsEmpty() {
+ DCHECK_LE(num_active_push_streams_, tasks_.size());
+ return tasks_.empty();
+}
+
+size_t SpdySession::SpdyStreamMap::NumActiveClientStreams() {
+ DCHECK_LE(num_active_push_streams_, tasks_.size());
+ return tasks_.size() - num_active_push_streams_;
+}
+
+size_t SpdySession::SpdyStreamMap::NumActivePushStreams() {
+ DCHECK_LE(num_active_push_streams_, tasks_.size());
+ return num_active_push_streams_;
+}
+
+bool SpdySession::SpdyStreamMap::IsStreamActive(net::SpdyStreamId stream_id) {
+ return tasks_.count(stream_id) > 0u;
+}
+
+void SpdySession::SpdyStreamMap::AddStreamTask(
+ StreamTaskWrapper* task_wrapper) {
+ DCHECK(task_wrapper);
+ SpdyStream* stream = task_wrapper->stream();
+ DCHECK(stream);
+ net::SpdyStreamId stream_id = stream->stream_id();
+ DCHECK_EQ(0u, tasks_.count(stream_id));
+ tasks_[stream_id] = task_wrapper;
+ if (stream->is_server_push()) {
+ ++num_active_push_streams_;
+ }
+ DCHECK_LE(num_active_push_streams_, tasks_.size());
+}
+
+void SpdySession::SpdyStreamMap::RemoveStreamTask(
+ StreamTaskWrapper* task_wrapper) {
+ DCHECK(task_wrapper);
+ SpdyStream* stream = task_wrapper->stream();
+ DCHECK(stream);
+ net::SpdyStreamId stream_id = stream->stream_id();
+ DCHECK_EQ(1u, tasks_.count(stream_id));
+ DCHECK_EQ(task_wrapper, tasks_[stream_id]);
+ if (stream->is_server_push()) {
+ DCHECK_GT(num_active_push_streams_, 0u);
+ --num_active_push_streams_;
+ }
+ tasks_.erase(stream_id);
+ DCHECK_LE(num_active_push_streams_, tasks_.size());
+}
+
+SpdyStream* SpdySession::SpdyStreamMap::GetStream(
+ net::SpdyStreamId stream_id) {
+ TaskMap::const_iterator iter = tasks_.find(stream_id);
+ if (iter == tasks_.end()) {
+ return NULL;
+ }
+ StreamTaskWrapper* task_wrapper = iter->second;
+ DCHECK(task_wrapper);
+ SpdyStream* stream = task_wrapper->stream();
+ DCHECK(stream);
+ DCHECK_EQ(stream_id, stream->stream_id());
+ return stream;
+}
+
+void SpdySession::SpdyStreamMap::AdjustAllOutputWindowSizes(int32 delta) {
+ for (TaskMap::const_iterator iter = tasks_.begin();
+ iter != tasks_.end(); ++iter) {
+ iter->second->stream()->AdjustOutputWindowSize(delta);
+ }
+}
+
+void SpdySession::SpdyStreamMap::AbortAllSilently() {
+ for (TaskMap::const_iterator iter = tasks_.begin();
+ iter != tasks_.end(); ++iter) {
+ iter->second->stream()->AbortSilently();
+ }
+}
+
+} // namespace mod_spdy