You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ji...@apache.org on 2014/05/01 13:39:32 UTC

svn commit: r1591620 [8/14] - in /httpd/mod_spdy/branches/httpd-2.2.x: ./ base/ base/metrics/ build/ install/ install/common/ install/debian/ install/rpm/ mod_spdy/ mod_spdy/apache/ mod_spdy/apache/filters/ mod_spdy/apache/testing/ mod_spdy/common/ mod...

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h Thu May  1 11:39:27 2014
@@ -0,0 +1,264 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_SESSION_H_
+#define MOD_SPDY_COMMON_SPDY_SESSION_H_
+
+#include <map>
+
+#include "base/basictypes.h"
+#include "base/synchronization/lock.h"
+#include "mod_spdy/common/executor.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/shared_flow_control_window.h"
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+#include "mod_spdy/common/spdy_server_push_interface.h"
+#include "mod_spdy/common/spdy_stream.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/buffered_spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace mod_spdy {
+
+class Executor;
+class SpdySessionIO;
+class SpdyServerConfig;
+class SpdyStreamTaskFactory;
+
+// Represents a SPDY session with a client.  Given an Executor for processing
+// individual SPDY streams, and a SpdySessionIO for communicating with the
+// client (sending and receiving frames), this class takes care of implementing
+// the SPDY protocol and responding correctly to various situations.
+class SpdySession : public net::BufferedSpdyFramerVisitorInterface,
+                    public SpdyServerPushInterface {
+ public:
+  // The SpdySession does _not_ take ownership of any of these arguments.
+  SpdySession(spdy::SpdyVersion spdy_version,
+              const SpdyServerConfig* config,
+              SpdySessionIO* session_io,
+              SpdyStreamTaskFactory* task_factory,
+              Executor* executor);
+  virtual ~SpdySession();
+
+  // What SPDY version is being used for this session?
+  spdy::SpdyVersion spdy_version() const { return spdy_version_; }
+
+  // What are the current shared window sizes for this session?  These are
+  // mostly useful for debugging.  Requires that spdy_version() >=
+  // SPDY_VERSION_3_1.
+  int32 current_shared_input_window_size() const;
+  int32 current_shared_output_window_size() const;
+
+  // Process the session; don't return until the session is finished.
+  void Run();
+
+  // BufferedSpdyFramerVisitorInterface methods:
+  virtual void OnError(net::SpdyFramer::SpdyError error_code);
+  virtual void OnStreamError(
+      net::SpdyStreamId stream_id, const std::string& description);
+  virtual void OnSynStream(
+      net::SpdyStreamId stream_id, net::SpdyStreamId associated_stream_id,
+      net::SpdyPriority priority, uint8 credential_slot, bool fin,
+      bool unidirectional, const net::SpdyHeaderBlock& headers);
+  virtual void OnSynReply(
+      net::SpdyStreamId stream_id, bool fin,
+      const net::SpdyHeaderBlock& headers);
+  virtual void OnHeaders(
+      net::SpdyStreamId stream_id, bool fin,
+      const net::SpdyHeaderBlock& headers);
+  virtual void OnStreamFrameData(
+      net::SpdyStreamId stream_id, const char* data, size_t length, bool fin);
+  virtual void OnSettings(bool clear_persisted);
+  virtual void OnSetting(net::SpdySettingsIds id, uint8 flags, uint32 value);
+  virtual void OnPing(uint32 unique_id);
+  virtual void OnRstStream(
+      net::SpdyStreamId stream_id, net::SpdyRstStreamStatus status);
+  virtual void OnGoAway(
+      net::SpdyStreamId last_accepted_stream_id, net::SpdyGoAwayStatus status);
+  virtual void OnWindowUpdate(
+      net::SpdyStreamId stream_id, uint32 delta_window_size);
+  virtual void OnPushPromise(
+      net::SpdyStreamId stream_id, net::SpdyStreamId promised_stream_id);
+
+  // SpdyServerPushInterface methods:
+  // Initiate a SPDY server push, roughly by pretending that the client sent a
+  // SYN_STREAM with the given headers.  To repeat: the headers argument is
+  // _not_ the headers that the server will send to the client, but rather the
+  // headers to _pretend_ that the client sent to the server.  Requires that
+  // spdy_version() >= SPDY/3.
+  // Note that unlike most other methods of this class, StartServerPush may be
+  // called by stream threads, not just by the connection thread.
+  virtual SpdyServerPushInterface::PushStatus StartServerPush(
+      net::SpdyStreamId associated_stream_id,
+      int32 server_push_depth,
+      net::SpdyPriority priority,
+      const net::SpdyHeaderBlock& request_headers);
+
+ private:
+  // A helper class for wrapping tasks returned by
+  // SpdyStreamTaskFactory::NewStreamTask().  Running or cancelling this task
+  // simply runs/cancels the wrapped task; however, this object also keeps a
+  // SpdyStream object, and on deletion, this will remove itself from the
+  // SpdySession's list of active streams.
+  class StreamTaskWrapper : public net_instaweb::Function {
+   public:
+    // This constructor, called by the main connection thread, will call
+    // task_factory_->NewStreamTask() to produce the wrapped task.
+    StreamTaskWrapper(SpdySession* spdy_session,
+                      net::SpdyStreamId stream_id,
+                      net::SpdyStreamId associated_stream_id,
+                      int32 server_push_depth,
+                      net::SpdyPriority priority);
+    virtual ~StreamTaskWrapper();
+
+    SpdyStream* stream() { return &stream_; }
+
+   protected:
+    // net_instaweb::Function methods (our implementations of these simply
+    // run/cancel the wrapped subtask):
+    virtual void Run();
+    virtual void Cancel();
+
+   private:
+    SpdySession* const spdy_session_;
+    SpdyStream stream_;
+    net_instaweb::Function* const subtask_;
+
+    DISALLOW_COPY_AND_ASSIGN(StreamTaskWrapper);
+  };
+
+  // Helper class for keeping track of active stream tasks, and separately
+  // tracking the number of active client/server-initiated streams.  This class
+  // is not thread-safe without external synchronization, so it is used below
+  // along with a separate mutex.
+  class SpdyStreamMap {
+   public:
+    SpdyStreamMap();
+    ~SpdyStreamMap();
+
+    // Determine whether there are no currently active streams.
+    bool IsEmpty();
+    // Get the number of currently active streams created by the client or
+    // server, respectively.
+    size_t NumActiveClientStreams();
+    size_t NumActivePushStreams();
+    // Determine if a particular stream ID is currently active.
+    bool IsStreamActive(net::SpdyStreamId stream_id);
+    // Get the specified stream object, or NULL if the stream is inactive.
+    SpdyStream* GetStream(net::SpdyStreamId stream_id);
+    // Add a new stream.  Requires that the stream ID is currently inactive.
+    void AddStreamTask(StreamTaskWrapper* task);
+    // Remove a stream task.  Requires that the stream is currently active.
+    void RemoveStreamTask(StreamTaskWrapper* task);
+    // Adjust the output window size of all active streams by the same delta.
+    void AdjustAllOutputWindowSizes(int32 delta);
+    // Abort all streams in the map.  Note that this won't immediately empty
+    // the map (the tasks still have to shut down).
+    void AbortAllSilently();
+
+   private:
+    typedef std::map<net::SpdyStreamId, StreamTaskWrapper*> TaskMap;
+    TaskMap tasks_;
+    size_t num_active_push_streams_;
+
+    DISALLOW_COPY_AND_ASSIGN(SpdyStreamMap);
+  };
+
+  // Validate and set the per-stream initial flow-control window size to the
+  // new value.  Must be using SPDY v3 or later to call this method.
+  void SetInitialWindowSize(uint32 new_init_window_size);
+
+  // Send a single SPDY frame to the client, compressing it first if necessary.
+  // Stop the session if the connection turns out to be closed.  This method
+  // takes ownership of the passed frame and will delete it.
+  void SendFrame(const net::SpdyFrameIR* frame);
+  // Send the frame as-is (without taking ownership).  Stop the session if the
+  // connection turns out to be closed.
+  void SendFrameRaw(const net::SpdySerializedFrame& frame);
+
+  // Immediately send a GOAWAY frame to the client with the given status,
+  // unless we've already sent one.  This also prevents us from creating any
+  // new streams, so calling this is the best way to shut the session down
+  // gracefully; once all streams have finished normally and no new ones can be
+  // created, the session will shut itself down.
+  void SendGoAwayFrame(net::SpdyGoAwayStatus status);
+  // Enqueue a RST_STREAM frame for the given stream ID.  Note that this does
+  // not abort the stream if it exists; for that, use AbortStream().
+  void SendRstStreamFrame(net::SpdyStreamId stream_id,
+                          net::SpdyRstStreamStatus status);
+  // Immediately send our SETTINGS frame, with values based on our
+  // SpdyServerConfig object.  This should be done exactly once, at session
+  // start.
+  void SendSettingsFrame();
+
+  // Close down the whole session immediately.  Abort all active streams, and
+  // then block until all stream threads have shut down.
+  void StopSession();
+  // Abort the stream without sending anything to the client.
+  void AbortStreamSilently(net::SpdyStreamId stream_id);
+  // Send a RST_STREAM frame and then abort the stream.
+  void AbortStream(net::SpdyStreamId stream_id,
+                   net::SpdyRstStreamStatus status);
+
+  // Remove the given StreamTaskWrapper object from the stream map.  This is
+  // the only other method of this class, aside from StartServerPush, that
+  // might be called from another thread.  (Specifically, it is called by the
+  // StreamTaskWrapper destructor, which is called by the executor).
+  void RemoveStreamTask(StreamTaskWrapper* stream_data);
+
+  // Grab the stream_map_lock_ and check if stream_map_ is empty.
+  bool StreamMapIsEmpty();
+
+  // These fields are accessed only by the main connection thread, so they need
+  // not be protected by a lock:
+  const spdy::SpdyVersion spdy_version_;
+  const SpdyServerConfig* const config_;
+  SpdySessionIO* const session_io_;
+  SpdyStreamTaskFactory* const task_factory_;
+  Executor* const executor_;
+  net::BufferedSpdyFramer framer_;
+  bool session_stopped_;  // StopSession() has been called
+  bool already_sent_goaway_;  // GOAWAY frame has been sent
+  net::SpdyStreamId last_client_stream_id_;
+  int32 initial_window_size_;  // per-stream initial flow-control window size
+  uint32 max_concurrent_pushes_;  // max number of active server pushes at once
+
+  // The stream map must be protected by a lock, because each stream thread
+  // will remove itself from the map (by calling RemoveStreamTask) when the
+  // stream closes.  You MUST hold the lock to use the stream_map_ OR to use
+  // any of the StreamTaskWrapper or SpdyStream objects contained therein
+  // (e.g. to post a frame to the stream), otherwise the stream object may be
+  // deleted by another thread while you're using it.  You should NOT be
+  // holding the lock when you e.g. send a frame to the client, as that may
+  // block for a long time.
+  base::Lock stream_map_lock_;
+  SpdyStreamMap stream_map_;
+  // These fields are also protected by the stream_map_lock_; they are used for
+  // controlling server pushes, which can be initiated by stream threads as
+  // well as by the connection thread.  We could use a separate lock for these,
+  // but right now we probably don't need that much locking granularity.
+  net::SpdyStreamId last_server_push_stream_id_;
+  bool received_goaway_;  // we've received a GOAWAY frame from the client
+
+  // These objects are also shared between all stream threads, but these
+  // classes are each thread-safe, and don't need additional synchronization.
+  SpdyFramePriorityQueue output_queue_;
+  SharedFlowControlWindow shared_window_;
+
+  DISALLOW_COPY_AND_ASSIGN(SpdySession);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_COMMON_SPDY_SESSION_H_

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,23 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_session_io.h"
+
+namespace mod_spdy {
+
+SpdySessionIO::SpdySessionIO() {}
+
+SpdySessionIO::~SpdySessionIO() {}
+
+}  // namespace mod_spdy

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h Thu May  1 11:39:27 2014
@@ -0,0 +1,82 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_SESSION_IO_H_
+#define MOD_SPDY_COMMON_SPDY_SESSION_IO_H_
+
+#include "base/basictypes.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace net {
+class BufferedSpdyFramer;
+}  // namespace net
+
+namespace mod_spdy {
+
+class SpdyStream;
+
+// SpdySessionIO is a helper interface for the SpdySession class.  The
+// SpdySessionIO takes care of implementation-specific details about how to
+// send and receive data, allowing the SpdySession to focus on the SPDY
+// protocol itself.  For example, a SpdySessionIO for Apache would hold onto a
+// conn_rec object and invoke the input and output filter chains for
+// ProcessAvailableInput and SendFrameRaw, respectively.  The SpdySessionIO
+// itself does not need to be thread-safe -- it is only ever used by the main
+// connection thread.
+class SpdySessionIO {
+ public:
+  // Status to describe whether reading succeeded.
+  enum ReadStatus {
+    READ_SUCCESS,  // we successfully pushed data into the SpdyFramer
+    READ_NO_DATA,  // no data is currently available
+    READ_CONNECTION_CLOSED,  // the connection has been closed
+    READ_ERROR  // an unrecoverable error (e.g. client sent malformed data)
+  };
+
+  // Status to describe whether writing succeeded.
+  enum WriteStatus {
+    WRITE_SUCCESS,  // we successfully wrote the frame out to the network
+    WRITE_CONNECTION_CLOSED,  // the connection has been closed
+  };
+
+  SpdySessionIO();
+  virtual ~SpdySessionIO();
+
+  // Return true if the connection has been externally aborted and should
+  // stop, false otherwise.
+  virtual bool IsConnectionAborted() = 0;
+
+  // Pull any available input data from the connection and feed it into the
+  // ProcessInput() method of the given SpdyFramer.  If no input data is
+  // currently available and the block argument is true, this should block
+  // until more data arrives; otherwise, this should not block.
+  virtual ReadStatus ProcessAvailableInput(
+      bool block, net::BufferedSpdyFramer* framer) = 0;
+
+  // Send a single SPDY frame to the client as-is; block until it has been
+  // sent down the wire.  Return true on success.
+  //
+  // TODO(mdsteele): We do need to be able to flush a single frame down the
+  //   wire, but we probably don't need/want to flush every single frame
+  //   individually in places where we send multiple frames at once.  We'll
+  //   probably want to adjust this API a bit.
+  virtual WriteStatus SendFrameRaw(const net::SpdySerializedFrame& frame) = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(SpdySessionIO);
+};
+
+}  // namespace mod_spdy
+
+#endif  // MOD_SPDY_COMMON_SPDY_SESSION_IO_H_

Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_test.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_test.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_test.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,1070 @@
+// Copyright 2010 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_session.h"
+
+#include <list>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/spdy_server_config.h"
+#include "mod_spdy/common/spdy_session_io.h"
+#include "mod_spdy/common/spdy_stream_task_factory.h"
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+#include "mod_spdy/common/thread_pool.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/buffered_spdy_framer.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using mod_spdy::testing::IsDataFrame;
+using mod_spdy::testing::IsGoAway;
+using mod_spdy::testing::IsHeaders;
+using mod_spdy::testing::IsPing;
+using mod_spdy::testing::IsRstStream;
+using mod_spdy::testing::IsSettings;
+using mod_spdy::testing::IsSynReply;
+using mod_spdy::testing::IsSynStream;
+using testing::_;
+using testing::AllOf;
+using testing::AtLeast;
+using testing::DoAll;
+using testing::Eq;
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
+using testing::NotNull;
+using testing::Property;
+using testing::Return;
+using testing::StrictMock;
+using testing::WithArg;
+
+namespace {
+
+void AddRequestHeaders(mod_spdy::spdy::SpdyVersion version,
+                       net::SpdyNameValueBlock *headers) {
+  const bool spdy2 = version < mod_spdy::spdy::SPDY_VERSION_3;
+  (*headers)[spdy2 ? mod_spdy::http::kHost :
+             mod_spdy::spdy::kSpdy3Host] = "www.example.com";
+  (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Method :
+             mod_spdy::spdy::kSpdy3Method] = "GET";
+  (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Scheme :
+             mod_spdy::spdy::kSpdy3Scheme] = "https";
+  (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Url :
+             mod_spdy::spdy::kSpdy3Path] = "/foo/index.html";
+  (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Version :
+             mod_spdy::spdy::kSpdy3Version] = "HTTP/1.1";
+}
+
+void AddResponseHeaders(mod_spdy::spdy::SpdyVersion version,
+                        net::SpdyNameValueBlock *headers) {
+  const bool spdy2 = version < mod_spdy::spdy::SPDY_VERSION_3;
+  (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Status :
+             mod_spdy::spdy::kSpdy3Status] = "200";
+  (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Version :
+             mod_spdy::spdy::kSpdy3Version] = "HTTP/1.1";
+  (*headers)[mod_spdy::http::kContentType] = "text/html";
+}
+
+void AddInitialServerPushHeaders(const std::string& path,
+                                 net::SpdyNameValueBlock *headers) {
+  (*headers)[mod_spdy::spdy::kSpdy3Host] = "www.example.com";
+  (*headers)[mod_spdy::spdy::kSpdy3Path] = path;
+  (*headers)[mod_spdy::spdy::kSpdy3Scheme] = "https";
+}
+
+class MockSpdySessionIO : public mod_spdy::SpdySessionIO {
+ public:
+  MOCK_METHOD0(IsConnectionAborted, bool());
+  MOCK_METHOD2(ProcessAvailableInput,
+               ReadStatus(bool, net::BufferedSpdyFramer*));
+  MOCK_METHOD1(SendFrameRaw, WriteStatus(const net::SpdySerializedFrame&));
+};
+
+class MockSpdyStreamTaskFactory : public mod_spdy::SpdyStreamTaskFactory {
+ public:
+  MOCK_METHOD1(NewStreamTask, net_instaweb::Function*(mod_spdy::SpdyStream*));
+};
+
+class MockStreamTask : public net_instaweb::Function {
+ public:
+  MockStreamTask() : stream(NULL) {}
+  MOCK_METHOD0(Run, void());
+  MOCK_METHOD0(Cancel, void());
+  mod_spdy::SpdyStream* stream;
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MockStreamTask);
+};
+
+// gMock action to be used with NewStreamTask.
+ACTION_P(ReturnMockTask, task) {
+  task->stream = arg0;
+  return task;
+}
+
+// gMock action to be used with MockStreamTask::Run.
+ACTION_P4(StartServerPush, task, priority, path, expected_status) {
+  net::SpdyHeaderBlock push_headers;
+  AddInitialServerPushHeaders(path, &push_headers);
+  EXPECT_EQ(expected_status,
+            task->stream->StartServerPush(priority, push_headers));
+}
+
+// gMock action to be used with MockStreamTask::Run.
+ACTION_P(SendResponseHeaders, task) {
+  net::SpdyHeaderBlock headers;
+  AddResponseHeaders(task->stream->spdy_version(), &headers);
+  if (task->stream->is_server_push()) {
+    task->stream->SendOutputHeaders(headers, false);
+  } else {
+    task->stream->SendOutputSynReply(headers, false);
+  }
+}
+
+// gMock action to be used with MockStreamTask::Run.
+ACTION_P3(SendDataFrame, task, data, fin) {
+  task->stream->SendOutputDataFrame(data, fin);
+}
+
+// gMock action to be used with MockStreamTask::Run.
+ACTION_P(ConsumeInputUntilAborted, task) {
+  while (!task->stream->is_aborted()) {
+    net::SpdyFrameIR* raw_frame = NULL;
+    if (task->stream->GetInputFrame(true, &raw_frame)) {
+      delete raw_frame;
+    }
+  }
+}
+
+// An executor that runs all tasks in the same thread, either immediately when
+// they are added or when it is told to run them.
+class InlineExecutor : public mod_spdy::Executor {
+ public:
+  InlineExecutor() : run_on_add_(false), stopped_(false) {}
+  virtual ~InlineExecutor() { Stop(); }
+
+  virtual void AddTask(net_instaweb::Function* task,
+                       net::SpdyPriority priority) {
+    if (stopped_) {
+      task->CallCancel();
+    } else if (run_on_add_) {
+      task->CallRun();
+    } else {
+      tasks_.push_back(task);
+    }
+  }
+  virtual void Stop() {
+    stopped_ = true;
+    while (!tasks_.empty()) {
+      tasks_.front()->CallCancel();
+      tasks_.pop_front();
+    }
+  }
+  void RunOne() {
+    if (!tasks_.empty()) {
+      tasks_.front()->CallRun();
+      tasks_.pop_front();
+    }
+  }
+  void RunAll() {
+    while (!tasks_.empty()) {
+      RunOne();
+    }
+  }
+  void set_run_on_add(bool run) { run_on_add_ = run; }
+  bool stopped() const { return stopped_; }
+
+ private:
+  std::list<net_instaweb::Function*> tasks_;
+  bool run_on_add_;
+  bool stopped_;
+
+  DISALLOW_COPY_AND_ASSIGN(InlineExecutor);
+};
+
+// A BufferedSpdyFramer visitor that constructs IR objects for the frames it
+// parses.
+class ClientVisitor : public net::BufferedSpdyFramerVisitorInterface {
+ public:
+  ClientVisitor() : last_data_(NULL), last_settings_(NULL) {}
+  virtual ~ClientVisitor() {}
+
+  virtual void OnError(net::SpdyFramer::SpdyError error_code) {}
+  virtual void OnStreamError(net::SpdyStreamId stream_id,
+                             const std::string& description) {}
+  virtual void OnSynStream(net::SpdyStreamId id, net::SpdyStreamId assoc_id,
+                           net::SpdyPriority priority, uint8 slot,
+                           bool fin, bool unidirectional,
+                           const net::SpdyHeaderBlock& headers) {
+    scoped_ptr<net::SpdySynStreamIR> frame(new net::SpdySynStreamIR(id));
+    frame->set_associated_to_stream_id(assoc_id);
+    frame->set_priority(priority);
+    frame->set_slot(slot);
+    frame->set_fin(fin);
+    frame->set_unidirectional(unidirectional);
+    frame->GetMutableNameValueBlock()->insert(
+        headers.begin(), headers.end());
+    last_frame_.reset(frame.release());
+  }
+  virtual void OnSynReply(net::SpdyStreamId id, bool fin,
+                          const net::SpdyHeaderBlock& headers) {
+    scoped_ptr<net::SpdySynReplyIR> frame(new net::SpdySynReplyIR(id));
+    frame->set_fin(fin);
+    frame->GetMutableNameValueBlock()->insert(
+        headers.begin(), headers.end());
+    last_frame_.reset(frame.release());
+  }
+  virtual void OnHeaders(net::SpdyStreamId id, bool fin,
+                         const net::SpdyHeaderBlock& headers) {
+    scoped_ptr<net::SpdyHeadersIR> frame(new net::SpdyHeadersIR(id));
+    frame->set_fin(fin);
+    frame->GetMutableNameValueBlock()->insert(
+        headers.begin(), headers.end());
+    last_frame_.reset(frame.release());
+  }
+  virtual void OnStreamFrameData(net::SpdyStreamId id, const char* data,
+                                 size_t len, bool fin) {
+    if (len == 0 && last_data_ != NULL && last_data_ == last_frame_.get()) {
+      last_data_->set_fin(fin);
+    } else {
+      scoped_ptr<net::SpdyDataIR> frame(new net::SpdyDataIR(
+          id, base::StringPiece(data, len)));
+      frame->set_fin(fin);
+      last_data_ = frame.get();
+      last_frame_.reset(frame.release());
+    }
+  }
+  virtual void OnSettings(bool clear_persisted) {
+    scoped_ptr<net::SpdySettingsIR> frame(new net::SpdySettingsIR);
+    frame->set_clear_settings(clear_persisted);
+    last_settings_ = frame.get();
+    last_frame_.reset(frame.release());
+  }
+  virtual void OnSetting(net::SpdySettingsIds id, uint8 flags, uint32 value) {
+    CHECK(last_settings_ != NULL && last_settings_ == last_frame_.get());
+    last_settings_->AddSetting(
+        id, (flags & net::SETTINGS_FLAG_PLEASE_PERSIST),
+        (flags & net::SETTINGS_FLAG_PERSISTED), value);
+  }
+  virtual void OnPing(uint32 id) {
+    last_frame_.reset(new net::SpdyPingIR(id));
+  }
+  virtual void OnRstStream(net::SpdyStreamId id,
+                           net::SpdyRstStreamStatus status) {
+    last_frame_.reset(new net::SpdyRstStreamIR(id, status));
+  }
+  virtual void OnGoAway(net::SpdyStreamId id, net::SpdyGoAwayStatus status) {
+    last_frame_.reset(new net::SpdyGoAwayIR(id, status));
+  }
+  virtual void OnWindowUpdate(net::SpdyStreamId id, uint32 delta) {
+    last_frame_.reset(new net::SpdyWindowUpdateIR(id, delta));
+  }
+  virtual void OnPushPromise(net::SpdyStreamId id, net::SpdyStreamId promise) {
+    last_frame_.reset(new net::SpdyPushPromiseIR(id, promise));
+  }
+
+  net::SpdyFrameIR* ReleaseLastFrame() {
+    return last_frame_.release();
+  }
+
+ private:
+  net::SpdyDataIR* last_data_;
+  net::SpdySettingsIR* last_settings_;
+  scoped_ptr<net::SpdyFrameIR> last_frame_;
+
+  DISALLOW_COPY_AND_ASSIGN(ClientVisitor);
+};
+
+ACTION_P2(ClientDecodeFrame, test, matcher) {
+  scoped_ptr<net::SpdyFrameIR> frame(test->DecodeFrameOnClient(arg0));
+  ASSERT_TRUE(frame != NULL);
+  EXPECT_THAT(*frame, matcher);
+}
+
+ACTION_P3(SendBackWindowUpdate, test, stream_id, delta) {
+  test->ReceiveWindowUpdateFrameFromClient(stream_id, delta);
+}
+
+ACTION_P3(SendBackSettings, test, key, value) {
+  test->ReceiveSettingsFrameFromClient(key, value);
+}
+
+// Base class for SpdySession tests.
+class SpdySessionTestBase :
+      public testing::TestWithParam<mod_spdy::spdy::SpdyVersion> {
+ public:
+  SpdySessionTestBase()
+      : spdy_version_(GetParam()),
+        client_framer_(mod_spdy::SpdyVersionToFramerVersion(spdy_version_),
+                       true) {
+    client_framer_.set_visitor(&client_visitor_);
+    ON_CALL(session_io_, IsConnectionAborted()).WillByDefault(Return(false));
+    ON_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+        .WillByDefault(Invoke(this, &SpdySessionTestBase::ReadNextInputChunk));
+    ON_CALL(session_io_, SendFrameRaw(_))
+        .WillByDefault(Return(mod_spdy::SpdySessionIO::WRITE_SUCCESS));
+  }
+
+  // Use as gMock action for ProcessAvailableInput:
+  //   Invoke(this, &SpdySessionTest::ReadNextInputChunk)
+  mod_spdy::SpdySessionIO::ReadStatus ReadNextInputChunk(
+      bool block, net::BufferedSpdyFramer* framer) {
+    if (input_queue_.empty()) {
+      return mod_spdy::SpdySessionIO::READ_NO_DATA;
+    }
+    const std::string chunk = input_queue_.front();
+    input_queue_.pop_front();
+    framer->ProcessInput(chunk.data(), chunk.size());
+    return (framer->HasError() ? mod_spdy::SpdySessionIO::READ_ERROR :
+            mod_spdy::SpdySessionIO::READ_SUCCESS);
+  }
+
+  // This is called by the ClientDecodeFrame gMock action defined above.
+  net::SpdyFrameIR* DecodeFrameOnClient(
+      const net::SpdySerializedFrame& frame) {
+    client_framer_.ProcessInput(frame.data(), frame.size());
+    return client_visitor_.ReleaseLastFrame();
+  }
+
+  // Push a frame into the input queue.
+  void ReceiveFrameFromClient(const net::SpdySerializedFrame& frame) {
+    input_queue_.push_back(std::string(frame.data(), frame.size()));
+  }
+
+  // Push a PING frame into the input queue.
+  void ReceivePingFromClient(uint32 id) {
+    scoped_ptr<net::SpdySerializedFrame> frame(
+        client_framer_.CreatePingFrame(id));
+    ReceiveFrameFromClient(*frame);
+  }
+
+  // Push a valid SYN_STREAM frame into the input queue.
+  void ReceiveSynStreamFromClient(net::SpdyStreamId stream_id,
+                                  net::SpdyPriority priority,
+                                  net::SpdyControlFlags flags) {
+    net::SpdyHeaderBlock headers;
+    AddRequestHeaders(spdy_version_, &headers);
+    scoped_ptr<net::SpdySerializedFrame> frame(client_framer_.CreateSynStream(
+        stream_id, 0, priority, 0, flags,
+        true,  // true = use compression
+        &headers));
+    ReceiveFrameFromClient(*frame);
+  }
+
+  // Push a valid DATA frame into the input queue.
+  void ReceiveDataFromClient(net::SpdyStreamId stream_id,
+                             base::StringPiece data,
+                             net::SpdyDataFlags flags) {
+    scoped_ptr<net::SpdySerializedFrame> frame(client_framer_.CreateDataFrame(
+        stream_id, data.data(), data.size(), flags));
+    ReceiveFrameFromClient(*frame);
+  }
+
+  // Push a SETTINGS frame into the input queue.
+  void ReceiveSettingsFrameFromClient(
+      net::SpdySettingsIds setting, uint32 value) {
+    net::SettingsMap settings;
+    settings[setting] = std::make_pair(net::SETTINGS_FLAG_NONE, value);
+    scoped_ptr<net::SpdySerializedFrame> frame(
+        client_framer_.CreateSettings(settings));
+    ReceiveFrameFromClient(*frame);
+  }
+
+  // Push a WINDOW_UPDATE frame into the input queue.
+  void ReceiveWindowUpdateFrameFromClient(
+      net::SpdyStreamId stream_id, uint32 delta) {
+    scoped_ptr<net::SpdySerializedFrame> frame(
+        client_framer_.CreateWindowUpdate(stream_id, delta));
+    ReceiveFrameFromClient(*frame);
+  }
+
+ protected:
+  void ExpectSendFrame(::testing::Matcher<const net::SpdyFrameIR&> matcher) {
+    EXPECT_CALL(session_io_, SendFrameRaw(_))
+        .WillOnce(DoAll(ClientDecodeFrame(this, matcher),
+                        Return(mod_spdy::SpdySessionIO::WRITE_SUCCESS)));
+  }
+
+  void ExpectBeginServerPush(
+      net::SpdyStreamId stream_id, net::SpdyStreamId assoc_stream_id,
+      net::SpdyPriority priority, const std::string& path) {
+    net::SpdyNameValueBlock headers;
+    AddInitialServerPushHeaders(path, &headers);
+    ExpectSendFrame(IsSynStream(stream_id, assoc_stream_id, priority, false,
+                                true, headers));
+  }
+
+  void ExpectSendSynReply(net::SpdyStreamId stream_id, bool fin) {
+    net::SpdyNameValueBlock headers;
+    AddResponseHeaders(spdy_version_, &headers);
+    ExpectSendFrame(IsSynReply(stream_id, fin, headers));
+  }
+
+  void ExpectSendHeaders(net::SpdyStreamId stream_id, bool fin) {
+    net::SpdyNameValueBlock headers;
+    AddResponseHeaders(spdy_version_, &headers);
+    ExpectSendFrame(IsHeaders(stream_id, fin, headers));
+  }
+
+  void ExpectSendGoAway(net::SpdyStreamId last_stream_id,
+                        net::SpdyGoAwayStatus status) {
+    // SPDY/2 doesn't have status codes on GOAWAY frames, so for SPDY/2 the
+    // client framer decodes it as GOAWAY_OK regardless of what we sent.
+    if (spdy_version_ == mod_spdy::spdy::SPDY_VERSION_2) {
+      ExpectSendFrame(IsGoAway(last_stream_id, net::GOAWAY_OK));
+    } else {
+      ExpectSendFrame(IsGoAway(last_stream_id, status));
+    }
+  }
+
+  const mod_spdy::spdy::SpdyVersion spdy_version_;
+  ClientVisitor client_visitor_;
+  net::BufferedSpdyFramer client_framer_;
+  mod_spdy::SpdyServerConfig config_;
+  StrictMock<MockSpdySessionIO> session_io_;
+  StrictMock<MockSpdyStreamTaskFactory> task_factory_;
+  std::list<std::string> input_queue_;
+};
+
+// Class for most SpdySession tests; this uses an InlineExecutor, so that test
+// behavior is very predictable.
+class SpdySessionTest : public SpdySessionTestBase {
+ public:
+  SpdySessionTest()
+      : session_(spdy_version_, &config_, &session_io_, &task_factory_,
+                 &executor_) {}
+
+ protected:
+  InlineExecutor executor_;
+  mod_spdy::SpdySession session_;
+};
+
+// Test that if the connection is already closed, we stop immediately.
+TEST_P(SpdySessionTest, ConnectionAlreadyClosed) {
+  testing::InSequence seq;
+  EXPECT_CALL(session_io_, SendFrameRaw(_))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::WRITE_CONNECTION_CLOSED));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the connection is aborted, we stop.
+TEST_P(SpdySessionTest, ImmediateConnectionAbort) {
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted()).WillOnce(Return(true));
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test responding to a PING frame from the client (followed by the connection
+// closing, so that we can exit the Run loop).
+TEST_P(SpdySessionTest, SinglePing) {
+  ReceivePingFromClient(47);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  ExpectSendFrame(IsPing(47));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+  ExpectSendGoAway(0, net::GOAWAY_OK);
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test handling a single stream request.
+TEST_P(SpdySessionTest, SingleStream) {
+  MockStreamTask* task = new MockStreamTask;
+  executor_.set_run_on_add(false);
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(ReturnMockTask(task));
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+                      Return(false)));
+  EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+      SendDataFrame(task, "quux", true)));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(false), NotNull()));
+  ExpectSendSynReply(stream_id, false);
+  ExpectSendFrame(IsDataFrame(stream_id, false, "foobar"));
+  ExpectSendFrame(IsDataFrame(stream_id, true, "quux"));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+  ExpectSendGoAway(1, net::GOAWAY_OK);
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that if SendFrameRaw fails, we immediately stop trying to send data and
+// shut down the session.
+TEST_P(SpdySessionTest, ShutDownSessionIfSendFrameRawFails) {
+  MockStreamTask* task = new MockStreamTask;
+  executor_.set_run_on_add(false);
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  testing::InSequence seq;
+  // We start out the same way as in the SingleStream test above.
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, _));
+  EXPECT_CALL(task_factory_, NewStreamTask(_))
+      .WillOnce(ReturnMockTask(task));
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+                      Return(false)));
+  EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+      SendDataFrame(task, "quux", true)));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, _));
+  ExpectSendSynReply(stream_id, false);
+  // At this point, the connection is closed by the client.
+  EXPECT_CALL(session_io_, SendFrameRaw(_))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::WRITE_CONNECTION_CLOSED));
+  // Even though we have another frame to send at this point (already in the
+  // output queue), we immediately stop sending data and exit the session.
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the client sends us garbage data, we send a GOAWAY frame and
+// then quit.
+TEST_P(SpdySessionTest, SendGoawayInResponseToGarbage) {
+  input_queue_.push_back("\x88\x5f\x92\x02\xf8\x92\x12\xd1"
+                         "\x82\xdc\x1a\x40\xbb\xb2\x9d\x13");
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the client sends us a SYN_STREAM with a corrupted header
+// block, we send a GOAWAY frame and then quit.
+TEST_P(SpdySessionTest, SendGoawayForBadSynStreamCompression) {
+  net::SpdyHeaderBlock headers;
+  headers["foobar"] = "Foo is to bar as bar is to baz.";
+  net::SpdyFramer framer(mod_spdy::SpdyVersionToFramerVersion(spdy_version_));
+  framer.set_enable_compression(false);
+  scoped_ptr<net::SpdySerializedFrame> frame(framer.CreateSynStream(
+      1, 0, framer.GetHighestPriority(), 0, net::CONTROL_FLAG_FIN,
+      false,  // false = no compression
+      &headers));
+  ReceiveFrameFromClient(*frame);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// TODO(mdsteele): At the moment, SpdyFramer DCHECKs that the stream ID is
+// nonzero when decoding, so this test would crash in debug builds.  Once this
+// has been corrected in the Chromium code, we can remove this #ifdef.
+#ifdef NDEBUG
+// Test that when the client sends us a SYN_STREAM with a stream ID of 0, we
+// send a GOAWAY frame and then quit.
+TEST_P(SpdySessionTest, SendGoawayForSynStreamIdZero) {
+  net::SpdyHeaderBlock headers;
+  AddRequestHeaders(spdy_version_, &headers);
+  scoped_ptr<net::SpdySerializedFrame> frame(client_framer_.CreateSynStream(
+      0, 0, client_framer_.GetHighestPriority(), 0, net::CONTROL_FLAG_FIN,
+      true,  // true = use compression
+      &headers));
+  ReceiveFrameFromClient(*frame);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+#endif
+
+// Test that when the client sends us two SYN_STREAMs with the same ID, we send
+// a GOAWAY frame (but still finish out the good stream before quitting).
+TEST_P(SpdySessionTest, SendGoawayForDuplicateStreamId) {
+  MockStreamTask* task = new MockStreamTask;
+  executor_.set_run_on_add(false);
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  // Get the first SYN_STREAM; it looks good, so create a new task (but because
+  // we set executor_.set_run_on_add(false) above, it doesn't execute yet).
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(ReturnMockTask(task));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  // There's an active stream out, so ProcessAvailableInput should have false
+  // for the first argument (false = nonblocking read).  Here we get the second
+  // SYN_STREAM with the same stream ID, so we should send GOAWAY.
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(false), NotNull()));
+  ExpectSendGoAway(1, net::GOAWAY_PROTOCOL_ERROR);
+  // At this point, tell the executor to run the task.
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+                      Return(false)));
+  EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+      SendDataFrame(task, "quux", true)));
+  // The stream is no longer active, but there are pending frames to send, so
+  // we shouldn't block on input.
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(false), NotNull()));
+  // Now we should send the output.
+  ExpectSendSynReply(stream_id, false);
+  ExpectSendFrame(IsDataFrame(stream_id, false, "foobar"));
+  ExpectSendFrame(IsDataFrame(stream_id, true, "quux"));
+  // Finally, there is no more output to send, and no chance of creating new
+  // streams (since we GOAWAY'd), so we quit.
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Run each test over both SPDY v2 and SPDY v3.
+INSTANTIATE_TEST_CASE_P(Spdy2And3, SpdySessionTest, testing::Values(
+    mod_spdy::spdy::SPDY_VERSION_2, mod_spdy::spdy::SPDY_VERSION_3,
+    mod_spdy::spdy::SPDY_VERSION_3_1));
+
+// Create a type alias so that we can instantiate some of our
+// SpdySessionTest-based tests using a different set of parameters.
+typedef SpdySessionTest SpdySessionNoFlowControlTest;
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE over SPDY v2.
+TEST_P(SpdySessionNoFlowControlTest, SendGoawayForInitialWindowSize) {
+  net::SettingsMap settings;
+  settings[net::SETTINGS_INITIAL_WINDOW_SIZE] =
+      std::make_pair(net::SETTINGS_FLAG_NONE, 4000);
+  scoped_ptr<net::SpdySerializedFrame> frame(
+      client_framer_.CreateSettings(settings));
+  ReceiveFrameFromClient(*frame);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Only run no-flow-control tests for SPDY v2.
+INSTANTIATE_TEST_CASE_P(Spdy2, SpdySessionNoFlowControlTest, testing::Values(
+    mod_spdy::spdy::SPDY_VERSION_2));
+
+// Test class for flow-control tests.  This uses a ThreadPool Executor, so that
+// we can test concurrency behavior.
+class SpdySessionFlowControlTest : public SpdySessionTestBase {
+ public:
+  SpdySessionFlowControlTest() : thread_pool_(1, 1) {}
+
+  void SetUp() {
+    ASSERT_TRUE(thread_pool_.Start());
+    executor_.reset(thread_pool_.NewExecutor());
+    session_.reset(new mod_spdy::SpdySession(
+        spdy_version_, &config_, &session_io_, &task_factory_,
+        executor_.get()));
+  }
+
+  void ExpectSendDataGetWindowUpdateBack(
+      net::SpdyStreamId stream_id, bool fin, base::StringPiece payload) {
+    EXPECT_CALL(session_io_, SendFrameRaw(_)).WillOnce(DoAll(
+        ClientDecodeFrame(this, IsDataFrame(stream_id, fin, payload)),
+        SendBackWindowUpdate(this, stream_id, payload.size()),
+        Return(mod_spdy::SpdySessionIO::WRITE_SUCCESS)));
+  }
+
+ protected:
+  mod_spdy::ThreadPool thread_pool_;
+  scoped_ptr<mod_spdy::Executor> executor_;
+  scoped_ptr<mod_spdy::SpdySession> session_;
+};
+
+TEST_P(SpdySessionFlowControlTest, SingleStreamWithFlowControl) {
+  MockStreamTask* task = new MockStreamTask;
+  // Start by setting the initial window size to very small (three bytes).
+  ReceiveSettingsFrameFromClient(net::SETTINGS_INITIAL_WINDOW_SIZE, 3);
+  // Then send a SYN_STREAM.
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  // We'll have to go through the loop at least five times -- once for each of
+  // five frames that we _must_ receive (SETTINGS, SYN_STREAM, and three
+  // WINDOW_UDPATEs.
+  EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(5));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+      .Times(AtLeast(5));
+
+  // The rest of these will have to happen in a fixed order.
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(ReturnMockTask(task));
+  EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+      SendDataFrame(task, "quux", true)));
+  // Since the window size is just three bytes, we can only send three bytes at
+  // a time.
+  ExpectSendSynReply(stream_id, false);
+  ExpectSendDataGetWindowUpdateBack(stream_id, false, "foo");
+  ExpectSendDataGetWindowUpdateBack(stream_id, false, "bar");
+  ExpectSendDataGetWindowUpdateBack(stream_id, false, "quu");
+  ExpectSendDataGetWindowUpdateBack(stream_id, true, "x");
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+  ExpectSendGoAway(stream_id, net::GOAWAY_OK);
+
+  session_->Run();
+}
+
+// Suppose the input side of the connection closes while we're blocked on flow
+// control; we should abort the blocked streams.
+TEST_P(SpdySessionFlowControlTest, CeaseInputWithFlowControl) {
+  MockStreamTask* task = new MockStreamTask;
+  // Start by setting the initial window size to very small (three bytes).
+  ReceiveSettingsFrameFromClient(net::SETTINGS_INITIAL_WINDOW_SIZE, 3);
+  // Then send a SYN_STREAM.
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(1));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+      .Times(AtLeast(1));
+
+  // The rest of these will have to happen in a fixed order.
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(ReturnMockTask(task));
+  EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+      SendDataFrame(task, "quux", true)));
+  ExpectSendSynReply(stream_id, false);
+  // Since the window size is just three bytes, we can only send three bytes at
+  // a time.  The stream thread will then be blocked.
+  ExpectSendFrame(IsDataFrame(stream_id, false, "foo"));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, _))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+  // At this point, we're blocked on flow control, and the test will close the
+  // input side of the connection.  Since the stream can never complete, the
+  // session should abort the stream and shut down, rather than staying blocked
+  // forever.
+  ExpectSendGoAway(stream_id, net::GOAWAY_OK);
+
+  session_->Run();
+}
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE with a value of 0.
+TEST_P(SpdySessionFlowControlTest, SendGoawayForTooSmallInitialWindowSize) {
+  ReceiveSettingsFrameFromClient(net::SETTINGS_INITIAL_WINDOW_SIZE, 0);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+  session_->Run();
+}
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE with a value of 0x80000000.
+TEST_P(SpdySessionFlowControlTest, SendGoawayForTooLargeInitialWindowSize) {
+  ReceiveSettingsFrameFromClient(net::SETTINGS_INITIAL_WINDOW_SIZE,
+                                 0x80000000);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+  session_->Run();
+}
+
+TEST_P(SpdySessionFlowControlTest, SharedOutputFlowControlWindow) {
+  ReceiveWindowUpdateFrameFromClient(0, 10000);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3_1) {
+    EXPECT_CALL(session_io_, IsConnectionAborted()).WillOnce(Return(true));
+  } else {
+    ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+  }
+
+  if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3_1) {
+    EXPECT_EQ(65536, session_->current_shared_output_window_size());
+  }
+  session_->Run();
+  if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3_1) {
+    EXPECT_EQ(75536, session_->current_shared_output_window_size());
+  }
+}
+
+TEST_P(SpdySessionFlowControlTest, SharedInputFlowControlWindow) {
+  MockStreamTask* task = new MockStreamTask;
+  const net::SpdyStreamId stream_id = 1;
+  const net::SpdyPriority priority = 2;
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_NONE);
+  const std::string data1(32000, 'x');
+  const std::string data2(2000, 'y');
+  ReceiveDataFromClient(stream_id, data1, net::DATA_FLAG_NONE);
+  ReceiveDataFromClient(stream_id, data1, net::DATA_FLAG_NONE);
+  ReceiveDataFromClient(stream_id, data2, net::DATA_FLAG_FIN);
+
+  EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(4));
+
+  // The rest of these will have to happen in a fixed order.
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  // Receive the SYN_STREAM from the client.
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(ReturnMockTask(task));
+  EXPECT_CALL(*task, Run()).WillOnce(ConsumeInputUntilAborted(task));
+  // Receive the first two blocks of data from the client with no problems.
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()));
+  // The third block of data is too much; it's a flow control error.  For
+  // SPDY/3.1 and up it's a session flow control error; for SPDY/3 it's a
+  // stream flow control error.
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()));
+  if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3_1) {
+    ExpectSendGoAway(stream_id, net::GOAWAY_PROTOCOL_ERROR);
+  } else {
+    ExpectSendFrame(IsRstStream(1, net::RST_STREAM_FLOW_CONTROL_ERROR));
+    EXPECT_CALL(session_io_, IsConnectionAborted()).WillOnce(Return(true));
+  }
+
+  session_->Run();
+}
+
+// Only run flow control tests for SPDY v3 and up.
+INSTANTIATE_TEST_CASE_P(Spdy3, SpdySessionFlowControlTest, testing::Values(
+    mod_spdy::spdy::SPDY_VERSION_3, mod_spdy::spdy::SPDY_VERSION_3_1));
+
+// Create a type alias so that we can instantiate some of our
+// SpdySessionTest-based tests using a different set of parameters.
+typedef SpdySessionTest SpdySessionServerPushTest;
+
+TEST_P(SpdySessionServerPushTest, SimpleServerPush) {
+  MockStreamTask* task1 = new MockStreamTask;
+  MockStreamTask* task2 = new MockStreamTask;
+  executor_.set_run_on_add(true);
+  const net::SpdyStreamId stream_id = 3;
+  const net::SpdyPriority priority = 2;
+  const net::SpdyPriority push_priority = 3;
+  const std::string push_path = "/script.js";
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(ReturnMockTask(task1));
+  EXPECT_CALL(*task1, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task1),
+      StartServerPush(task1, push_priority, push_path,
+                      mod_spdy::SpdyServerPushInterface::PUSH_STARTED),
+      SendDataFrame(task1, "foobar", false),
+      SendDataFrame(task1, "quux", true)));
+  // We should right away create the server push task, and get the SYN_STREAM
+  // before any other frames from the original stream.
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(2u)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id,
+                     Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(push_priority)))))
+      .WillOnce(ReturnMockTask(task2));
+  EXPECT_CALL(*task2, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task2),
+      SendDataFrame(task2, "hello", false),
+      SendDataFrame(task2, "world", true)));
+  ExpectBeginServerPush(2u, stream_id, push_priority, push_path);
+  // The pushed stream has a low priority, so the rest of the first stream
+  // should get sent before the rest of the pushed stream.
+  ExpectSendSynReply(stream_id, false);
+  ExpectSendFrame(IsDataFrame(stream_id, false, "foobar"));
+  ExpectSendFrame(IsDataFrame(stream_id, true, "quux"));
+  // Now we should get the rest of the pushed stream.
+  ExpectSendHeaders(2u, false);
+  ExpectSendFrame(IsDataFrame(2u, false, "hello"));
+  ExpectSendFrame(IsDataFrame(2u, true, "world"));
+  // And, we're done.
+  EXPECT_CALL(session_io_, IsConnectionAborted());
+  EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+  ExpectSendGoAway(stream_id, net::GOAWAY_OK);
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+TEST_P(SpdySessionServerPushTest, TooManyConcurrentPushes) {
+  MockStreamTask* task1 = new MockStreamTask;
+  MockStreamTask* task2 = new MockStreamTask;
+  MockStreamTask* task3 = new MockStreamTask;
+  executor_.set_run_on_add(false);
+  const net::SpdyStreamId stream_id = 9;
+  const net::SpdyPriority priority = 0;
+  ReceiveSettingsFrameFromClient(net::SETTINGS_MAX_CONCURRENT_STREAMS, 2);
+  ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+  EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(3));
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+      .Times(AtLeast(3));
+
+  testing::InSequence seq;
+  ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+      .WillOnce(ReturnMockTask(task1));
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunOne),
+                      Return(false)));
+  EXPECT_CALL(*task1, Run()).WillOnce(DoAll(
+      StartServerPush(task1, 3u, "/foo.css",
+          mod_spdy::SpdyServerPushInterface::PUSH_STARTED),
+      StartServerPush(task1, 2u, "/bar.css",
+          mod_spdy::SpdyServerPushInterface::PUSH_STARTED),
+      StartServerPush(task1, 1u, "/baz.css",
+          mod_spdy::SpdyServerPushInterface::TOO_MANY_CONCURRENT_PUSHES),
+      SendResponseHeaders(task1), SendDataFrame(task1, "html", true)));
+  // Start the first two pushes.  The third push should fail due to too many
+  // concurrent pushes.
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(2u)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id,
+                     Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(3u)))))
+      .WillOnce(ReturnMockTask(task2));
+  EXPECT_CALL(task_factory_, NewStreamTask(
+      AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(4u)),
+            Property(&mod_spdy::SpdyStream::associated_stream_id,
+                     Eq(stream_id)),
+            Property(&mod_spdy::SpdyStream::priority, Eq(2u)))))
+      .WillOnce(ReturnMockTask(task3));
+  // Now we get the SYN_STREAMs for the pushed streams before anything else.
+  ExpectBeginServerPush(2u, stream_id, 3u, "/foo.css");
+  ExpectBeginServerPush(4u, stream_id, 2u, "/bar.css");
+  // We now send the frames from the original stream.
+  ExpectSendSynReply(stream_id, false);
+  ExpectSendFrame(IsDataFrame(stream_id, true, "html"));
+  // At this point, the client will change MAX_CONCURRENT_STREAMS to zero.  We
+  // shouldn't barf, even though we have more active push streams than the new
+  // maximum.
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(
+          SendBackSettings(this, net::SETTINGS_MAX_CONCURRENT_STREAMS, 0u),
+          Return(false)));
+  // Now let's run the rest of the tasks.  One of them will try to start yet
+  // another server push, but that should fail because MAX_CONCURRENT_STREAMS
+  // is now zero.
+  EXPECT_CALL(session_io_, IsConnectionAborted())
+      .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+                      Return(false)));
+  EXPECT_CALL(*task2, Run()).WillOnce(DoAll(
+      SendResponseHeaders(task2), SendDataFrame(task2, "foo", true)));
+  EXPECT_CALL(*task3, Run()).WillOnce(DoAll(
+      StartServerPush(task3, 3u, "/stuff.png",
+          mod_spdy::SpdyServerPushInterface::TOO_MANY_CONCURRENT_PUSHES),
+      SendResponseHeaders(task3), SendDataFrame(task3, "bar", true)));
+  // And now we get all those frames.  The "bar" stream's frames should come
+  // first, because that's a higher-priority stream.
+  ExpectSendHeaders(4u, false);
+  ExpectSendFrame(IsDataFrame(4u, true, "bar"));
+  ExpectSendHeaders(2u, false);
+  ExpectSendFrame(IsDataFrame(2u, true, "foo"));
+  // And, we're done.
+  EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+      .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+  ExpectSendGoAway(stream_id, net::GOAWAY_OK);
+
+  session_.Run();
+  EXPECT_TRUE(executor_.stopped());
+}
+
+// Only run server push tests for SPDY v3 and up.
+INSTANTIATE_TEST_CASE_P(Spdy3, SpdySessionServerPushTest, testing::Values(
+    mod_spdy::spdy::SPDY_VERSION_3, mod_spdy::spdy::SPDY_VERSION_3_1));
+
+}  // namespace

Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_stream.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_stream.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_stream.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_stream.cc Thu May  1 11:39:27 2014
@@ -0,0 +1,456 @@
+// Copyright 2010 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_stream.h"
+
+
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/shared_flow_control_window.h"
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+#include "mod_spdy/common/spdy_frame_queue.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace {
+
+// The smallest WINDOW_UPDATE delta we're willing to send.  If the client sends
+// us less than this much data, we wait for more data before sending a
+// WINDOW_UPDATE frame (so that we don't end up sending lots of little ones).
+const size_t kMinWindowUpdateSize =
+    static_cast<size_t>(net::kSpdyStreamInitialWindowSize) / 8;
+
+class DataLengthVisitor : public net::SpdyFrameVisitor {
+ public:
+  DataLengthVisitor() : length_(0) {}
+  virtual ~DataLengthVisitor() {}
+
+  size_t length() const { return length_; }
+
+  virtual void VisitSynStream(const net::SpdySynStreamIR& frame) {}
+  virtual void VisitSynReply(const net::SpdySynReplyIR& frame) {}
+  virtual void VisitRstStream(const net::SpdyRstStreamIR& frame) {}
+  virtual void VisitSettings(const net::SpdySettingsIR& frame) {}
+  virtual void VisitPing(const net::SpdyPingIR& frame) {}
+  virtual void VisitGoAway(const net::SpdyGoAwayIR& frame) {}
+  virtual void VisitHeaders(const net::SpdyHeadersIR& frame) {}
+  virtual void VisitWindowUpdate(const net::SpdyWindowUpdateIR& frame) {}
+  virtual void VisitCredential(const net::SpdyCredentialIR& frame) {}
+  virtual void VisitBlocked(const net::SpdyBlockedIR& frame) {}
+  virtual void VisitPushPromise(const net::SpdyPushPromiseIR& frame) {}
+  virtual void VisitData(const net::SpdyDataIR& frame) {
+    length_ = frame.data().size();
+  }
+
+ private:
+  size_t length_;
+
+  DISALLOW_COPY_AND_ASSIGN(DataLengthVisitor);
+};
+
+// For data frames, return the size of the data payload; for control frames,
+// return zero.
+size_t DataFrameLength(const net::SpdyFrameIR& frame) {
+  DataLengthVisitor visitor;
+  frame.Visit(&visitor);
+  return visitor.length();
+}
+
+}  // namespace
+
+namespace mod_spdy {
+
+SpdyStream::SpdyStream(spdy::SpdyVersion spdy_version,
+                       net::SpdyStreamId stream_id,
+                       net::SpdyStreamId associated_stream_id,
+                       int32 server_push_depth,
+                       net::SpdyPriority priority,
+                       int32 initial_output_window_size,
+                       SpdyFramePriorityQueue* output_queue,
+                       SharedFlowControlWindow* shared_window,
+                       SpdyServerPushInterface* pusher)
+    : spdy_version_(spdy_version),
+      stream_id_(stream_id),
+      associated_stream_id_(associated_stream_id),
+      server_push_depth_(server_push_depth),
+      priority_(priority),
+      output_queue_(output_queue),
+      shared_window_(shared_window),
+      pusher_(pusher),
+      condvar_(&lock_),
+      aborted_(false),
+      output_window_size_(initial_output_window_size),
+      // TODO(mdsteele): Make our initial input window size configurable (we
+      //   would send the chosen value to the client with a SETTINGS frame).
+      input_window_size_(net::kSpdyStreamInitialWindowSize),
+      input_bytes_consumed_(0) {
+  DCHECK_NE(spdy::SPDY_VERSION_NONE, spdy_version);
+  DCHECK(output_queue_);
+  DCHECK(shared_window_ || spdy_version < spdy::SPDY_VERSION_3_1);
+  DCHECK(pusher_);
+  DCHECK_GT(output_window_size_, 0);
+  // In SPDY v2, priorities are in the range 0-3; in SPDY v3, they are 0-7.
+  DCHECK_GE(priority, 0u);
+  DCHECK_LE(priority, LowestSpdyPriorityForVersion(spdy_version));
+}
+
+SpdyStream::~SpdyStream() {}
+
+bool SpdyStream::is_server_push() const {
+  // By the SPDY spec, a stream has an even stream ID if and only if it was
+  // initiated by the server.
+  return stream_id_ % 2 == 0;
+}
+
+bool SpdyStream::is_aborted() const {
+  base::AutoLock autolock(lock_);
+  return aborted_;
+}
+
+void SpdyStream::AbortSilently() {
+  base::AutoLock autolock(lock_);
+  InternalAbortSilently();
+}
+
+void SpdyStream::AbortWithRstStream(net::SpdyRstStreamStatus status) {
+  base::AutoLock autolock(lock_);
+  InternalAbortWithRstStream(status);
+}
+
+int32 SpdyStream::current_input_window_size() const {
+  base::AutoLock autolock(lock_);
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+  return input_window_size_;
+}
+
+int32 SpdyStream::current_output_window_size() const {
+  base::AutoLock autolock(lock_);
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+  return output_window_size_;
+}
+
+void SpdyStream::OnInputDataConsumed(size_t size) {
+  // Sanity check: there is no input data to absorb for a server push stream,
+  // so we should only be getting called for client-initiated streams.
+  DCHECK(!is_server_push());
+
+  // Flow control only exists for SPDY v3 and up, so for SPDY v2 we don't need
+  // to bother tracking this.
+  if (spdy_version() < spdy::SPDY_VERSION_3) {
+    return;
+  }
+
+  // If the size arg is zero, this method should be a no-op, so just quit now.
+  if (size == 0) {
+    return;
+  }
+
+  base::AutoLock autolock(lock_);
+
+  // Don't bother with any of this if the stream has been aborted.
+  if (aborted_) {
+    return;
+  }
+
+  // First, if we're using SPDY/3.1 or later, we need to deal with the shared
+  // session window.  If after consuming this input data the shared window
+  // thinks it's time to send a WINDOW_UPDATE for the session input window
+  // (stream 0), send one, at top priority.
+  if (spdy_version_ >= spdy::SPDY_VERSION_3_1) {
+    const int32 shared_window_update =
+        shared_window_->OnInputDataConsumed(size);
+    if (shared_window_update > 0) {
+      output_queue_->Insert(
+          SpdyFramePriorityQueue::kTopPriority,
+          new net::SpdyWindowUpdateIR(0, shared_window_update));
+    }
+  }
+
+  // Make sure the current input window size is sane.  Although there are
+  // provisions in the SPDY spec that allow the window size to be temporarily
+  // negative, or to go above its default initial size, with our current
+  // implementation that should never happen.  Once we make the initial input
+  // window size configurable, we may need to adjust or remove these checks.
+  DCHECK_GE(input_window_size_, 0);
+  DCHECK_LE(input_window_size_, net::kSpdyStreamInitialWindowSize);
+
+  // Add the newly consumed data to the total.  Assuming our caller is behaving
+  // well (even if the client isn't) -- that is, they are only consuming as
+  // much data as we have put into the input queue -- there should be no
+  // overflow here, and the new value should be at most the amount of
+  // un-WINDOW_UPDATE-ed data we've received.  The reason we can be sure of
+  // this is that PostInputFrame() refuses to put more data into the queue than
+  // the window size allows, and aborts the stream if the client tries.
+  input_bytes_consumed_ += size;
+  DCHECK_GE(input_bytes_consumed_, size);
+  DCHECK_LE(input_bytes_consumed_,
+            static_cast<size_t>(net::kSpdyStreamInitialWindowSize -
+                                input_window_size_));
+
+  // We don't want to send lots of little WINDOW_UPDATE frames (as that would
+  // waste bandwidth), so only bother sending one once it would have a
+  // reasonably large value.
+  // TODO(mdsteele): Consider also tracking whether we have received a FLAG_FIN
+  //   on this stream; once we've gotten FLAG_FIN, there will be no more data,
+  //   so we don't need to send any more WINDOW_UPDATE frames.
+  if (input_bytes_consumed_ < kMinWindowUpdateSize) {
+    return;
+  }
+
+  // The SPDY spec forbids sending WINDOW_UPDATE frames with a non-positive
+  // delta-window-size (SPDY draft 3 section 2.6.8).  But since we already
+  // checked above that size was positive, input_bytes_consumed_ should now be
+  // positive as well.
+  DCHECK_GT(input_bytes_consumed_, 0u);
+  // Make sure there won't be any overflow shenanigans.
+  COMPILE_ASSERT(sizeof(size_t) >= sizeof(net::kSpdyMaximumWindowSize),
+                 size_t_is_at_least_32_bits);
+  DCHECK_LE(input_bytes_consumed_,
+            static_cast<size_t>(net::kSpdyMaximumWindowSize));
+
+  // Send a WINDOW_UPDATE frame to the client and update our window size.
+  SendOutputFrame(new net::SpdyWindowUpdateIR(
+      stream_id_, input_bytes_consumed_));
+  input_window_size_ += input_bytes_consumed_;
+  DCHECK_LE(input_window_size_, net::kSpdyStreamInitialWindowSize);
+  input_bytes_consumed_ = 0;
+}
+
+void SpdyStream::AdjustOutputWindowSize(int32 delta) {
+  base::AutoLock autolock(lock_);
+
+  // Flow control only exists for SPDY v3 and up.
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+
+  if (aborted_) {
+    return;
+  }
+
+  // Check for overflow; if it happens, abort the stream (which will wake up
+  // any blocked threads).  Note that although delta is usually positive, it
+  // can also be negative, so we check for both overflow and underflow.
+  const int64 new_size =
+      static_cast<int64>(output_window_size_) + static_cast<int64>(delta);
+  if (new_size > static_cast<int64>(net::kSpdyMaximumWindowSize) ||
+      new_size < -static_cast<int64>(net::kSpdyMaximumWindowSize)) {
+    LOG(WARNING) << "Flow control overflow/underflow on stream "
+                 << stream_id_ << ".  Aborting stream.";
+    InternalAbortWithRstStream(net::RST_STREAM_FLOW_CONTROL_ERROR);
+    return;
+  }
+
+  // Update the window size.
+  const int32 old_size = output_window_size_;
+  output_window_size_ = static_cast<int32>(new_size);
+
+  // If the window size is newly positive, wake up any blocked threads.
+  if (old_size <= 0 && output_window_size_ > 0) {
+    condvar_.Broadcast();
+  }
+}
+
+void SpdyStream::PostInputFrame(net::SpdyFrameIR* frame_ptr) {
+  base::AutoLock autolock(lock_);
+
+  // Take ownership of the frame, so it will get deleted if we return early.
+  scoped_ptr<net::SpdyFrameIR> frame(frame_ptr);
+
+  // Once a stream has been aborted, nothing more goes into the queue.
+  if (aborted_) {
+    return;
+  }
+
+  // If this is a nonempty data frame (and we're using SPDY v3 or above) we
+  // need to track flow control.
+  if (spdy_version() >= spdy::SPDY_VERSION_3) {
+    DCHECK_GE(input_window_size_, 0);
+    const int size = DataFrameLength(*frame);  // returns zero for ctrl frames
+    if (size > 0) {
+      // If receiving this much data would overflow the window size, then abort
+      // the stream with a flow control error.
+      if (size > input_window_size_) {
+        LOG(WARNING) << "Client violated flow control by sending too much data "
+                     << "to stream " << stream_id_ << ".  Aborting stream.";
+        InternalAbortWithRstStream(net::RST_STREAM_FLOW_CONTROL_ERROR);
+        return;  // Quit without posting the frame to the queue.
+      }
+      // Otherwise, decrease the window size.  It will be increased again once
+      // the data has been comsumed (by OnInputDataConsumed()).
+      else {
+        input_window_size_ -= size;
+      }
+    }
+  }
+
+  // Now that we've decreased the window size as necessary, we can make the
+  // frame available for consumption by the stream thread.
+  input_queue_.Insert(frame.release());
+}
+
+bool SpdyStream::GetInputFrame(bool block, net::SpdyFrameIR** frame) {
+  return input_queue_.Pop(block, frame);
+}
+
+void SpdyStream::SendOutputSynStream(const net::SpdyHeaderBlock& headers,
+                                     bool flag_fin) {
+  DCHECK(is_server_push());
+  base::AutoLock autolock(lock_);
+  if (aborted_) {
+    return;
+  }
+
+  scoped_ptr<net::SpdySynStreamIR> frame(new net::SpdySynStreamIR(stream_id_));
+  frame->set_associated_to_stream_id(associated_stream_id_);
+  frame->set_priority(priority_);
+  frame->set_fin(flag_fin);
+  frame->set_unidirectional(true);
+  frame->GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+  output_queue_->Insert(SpdyFramePriorityQueue::kTopPriority, frame.release());
+}
+
+void SpdyStream::SendOutputSynReply(const net::SpdyHeaderBlock& headers,
+                                    bool flag_fin) {
+  DCHECK(!is_server_push());
+  base::AutoLock autolock(lock_);
+  if (aborted_) {
+    return;
+  }
+
+  scoped_ptr<net::SpdySynReplyIR> frame(new net::SpdySynReplyIR(stream_id_));
+  frame->set_fin(flag_fin);
+  frame->GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+  SendOutputFrame(frame.release());
+}
+
+void SpdyStream::SendOutputHeaders(const net::SpdyHeaderBlock& headers,
+                                   bool flag_fin) {
+  base::AutoLock autolock(lock_);
+  if (aborted_) {
+    return;
+  }
+
+  scoped_ptr<net::SpdyHeadersIR> frame(new net::SpdyHeadersIR(stream_id_));
+  frame->set_fin(flag_fin);
+  frame->GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+  SendOutputFrame(frame.release());
+}
+
+void SpdyStream::SendOutputDataFrame(base::StringPiece data, bool flag_fin) {
+  base::AutoLock autolock(lock_);
+  if (aborted_) {
+    return;
+  }
+
+  // Flow control only exists for SPDY v3 and up; for SPDY v2, we can just send
+  // the data without regard to the window size.  Even with flow control, we
+  // can of course send empty DATA frames at will.
+  if (spdy_version() < spdy::SPDY_VERSION_3 || data.empty()) {
+    // Suppress empty DATA frames (unless we're setting FLAG_FIN).
+    if (!data.empty() || flag_fin) {
+      scoped_ptr<net::SpdyDataIR> frame(new net::SpdyDataIR(stream_id_, data));
+      frame->set_fin(flag_fin);
+      SendOutputFrame(frame.release());
+    }
+    return;
+  }
+
+  while (!data.empty()) {
+    // If the current window size is non-positive, we must wait to send data
+    // until the client increases it (or we abort).  Note that the window size
+    // can be negative if the client decreased the maximum window size (with a
+    // SETTINGS frame) after we already sent data (SPDY draft 3 section 2.6.8).
+    while (!aborted_ && output_window_size_ <= 0) {
+      condvar_.Wait();
+    }
+    if (aborted_) {
+      return;
+    }
+    // If the current window size is less than the amount of data we'd like to
+    // send, send a smaller data frame with the first part of the data, and
+    // then we'll sleep until the window size is increased before sending the
+    // rest.
+    DCHECK_LE(data.size(), static_cast<size_t>(kint32max));
+    const int32 full_length = data.size();
+    DCHECK_GT(output_window_size_, 0);
+    const int32 length_desired = std::min(full_length, output_window_size_);
+    output_window_size_ -= length_desired;
+    DCHECK_GE(output_window_size_, 0);
+    // Now we need to request quota from the session-shared flow control
+    // window.  Since the call to RequestQuota may block, we need to unlock
+    // first.
+    int32 length_acquired;
+    if (spdy_version() >= spdy::SPDY_VERSION_3_1) {
+      base::AutoUnlock autounlock(lock_);
+      DCHECK(shared_window_);
+      length_acquired = shared_window_->RequestOutputQuota(length_desired);
+    } else {
+      // For SPDY versions that don't have a session window, just act like we
+      // got the quota we wanted.
+      length_acquired = length_desired;
+    }
+    // RequestQuota will return zero if the shared window has been aborted
+    // (i.e. if the session has been aborted).  So in that case let's just
+    // abort too.
+    if (length_acquired <= 0) {
+      InternalAbortSilently();
+      return;
+    }
+    // If we didn't acquire as much as we wanted from the shared window, put
+    // the amount we're not actually using back into output_window_size_.
+    else if (length_acquired < length_desired) {
+      output_window_size_ += length_desired - length_acquired;
+    }
+    // Actually send the frame.
+    scoped_ptr<net::SpdyDataIR> frame(
+        new net::SpdyDataIR(stream_id_, data.substr(0, length_acquired)));
+    frame->set_fin(flag_fin && length_acquired == full_length);
+    SendOutputFrame(frame.release());
+    data = data.substr(length_acquired);
+  }
+}
+
+SpdyServerPushInterface::PushStatus SpdyStream::StartServerPush(
+    net::SpdyPriority priority,
+    const net::SpdyHeaderBlock& request_headers) {
+  DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+  return pusher_->StartServerPush(stream_id_, server_push_depth_ + 1, priority,
+                                  request_headers);
+}
+
+void SpdyStream::SendOutputFrame(net::SpdyFrameIR* frame) {
+  lock_.AssertAcquired();
+  DCHECK(!aborted_);
+  output_queue_->Insert(static_cast<int>(priority_), frame);
+}
+
+void SpdyStream::InternalAbortSilently() {
+  lock_.AssertAcquired();
+  input_queue_.Abort();
+  aborted_ = true;
+  condvar_.Broadcast();
+}
+
+void SpdyStream::InternalAbortWithRstStream(net::SpdyRstStreamStatus status) {
+  lock_.AssertAcquired();
+  output_queue_->Insert(SpdyFramePriorityQueue::kTopPriority,
+                        new net::SpdyRstStreamIR(stream_id_, status));
+  // InternalAbortSilently will set aborted_ to true, which will prevent the
+  // stream thread from sending any more frames on this stream after the
+  // RST_STREAM.
+  InternalAbortSilently();
+}
+
+}  // namespace mod_spdy