You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@httpd.apache.org by ji...@apache.org on 2014/05/01 13:39:32 UTC
svn commit: r1591620 [8/14] - in /httpd/mod_spdy/branches/httpd-2.2.x: ./
base/ base/metrics/ build/ install/ install/common/ install/debian/
install/rpm/ mod_spdy/ mod_spdy/apache/ mod_spdy/apache/filters/
mod_spdy/apache/testing/ mod_spdy/common/ mod...
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h Thu May 1 11:39:27 2014
@@ -0,0 +1,264 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_SESSION_H_
+#define MOD_SPDY_COMMON_SPDY_SESSION_H_
+
+#include <map>
+
+#include "base/basictypes.h"
+#include "base/synchronization/lock.h"
+#include "mod_spdy/common/executor.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/shared_flow_control_window.h"
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+#include "mod_spdy/common/spdy_server_push_interface.h"
+#include "mod_spdy/common/spdy_stream.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/buffered_spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace mod_spdy {
+
+class Executor;
+class SpdySessionIO;
+class SpdyServerConfig;
+class SpdyStreamTaskFactory;
+
+// Represents a SPDY session with a client. Given an Executor for processing
+// individual SPDY streams, and a SpdySessionIO for communicating with the
+// client (sending and receiving frames), this class takes care of implementing
+// the SPDY protocol and responding correctly to various situations.
+class SpdySession : public net::BufferedSpdyFramerVisitorInterface,
+ public SpdyServerPushInterface {
+ public:
+ // The SpdySession does _not_ take ownership of any of these arguments.
+ SpdySession(spdy::SpdyVersion spdy_version,
+ const SpdyServerConfig* config,
+ SpdySessionIO* session_io,
+ SpdyStreamTaskFactory* task_factory,
+ Executor* executor);
+ virtual ~SpdySession();
+
+ // What SPDY version is being used for this session?
+ spdy::SpdyVersion spdy_version() const { return spdy_version_; }
+
+ // What are the current shared window sizes for this session? These are
+ // mostly useful for debugging. Requires that spdy_version() >=
+ // SPDY_VERSION_3_1.
+ int32 current_shared_input_window_size() const;
+ int32 current_shared_output_window_size() const;
+
+ // Process the session; don't return until the session is finished.
+ void Run();
+
+ // BufferedSpdyFramerVisitorInterface methods:
+ virtual void OnError(net::SpdyFramer::SpdyError error_code);
+ virtual void OnStreamError(
+ net::SpdyStreamId stream_id, const std::string& description);
+ virtual void OnSynStream(
+ net::SpdyStreamId stream_id, net::SpdyStreamId associated_stream_id,
+ net::SpdyPriority priority, uint8 credential_slot, bool fin,
+ bool unidirectional, const net::SpdyHeaderBlock& headers);
+ virtual void OnSynReply(
+ net::SpdyStreamId stream_id, bool fin,
+ const net::SpdyHeaderBlock& headers);
+ virtual void OnHeaders(
+ net::SpdyStreamId stream_id, bool fin,
+ const net::SpdyHeaderBlock& headers);
+ virtual void OnStreamFrameData(
+ net::SpdyStreamId stream_id, const char* data, size_t length, bool fin);
+ virtual void OnSettings(bool clear_persisted);
+ virtual void OnSetting(net::SpdySettingsIds id, uint8 flags, uint32 value);
+ virtual void OnPing(uint32 unique_id);
+ virtual void OnRstStream(
+ net::SpdyStreamId stream_id, net::SpdyRstStreamStatus status);
+ virtual void OnGoAway(
+ net::SpdyStreamId last_accepted_stream_id, net::SpdyGoAwayStatus status);
+ virtual void OnWindowUpdate(
+ net::SpdyStreamId stream_id, uint32 delta_window_size);
+ virtual void OnPushPromise(
+ net::SpdyStreamId stream_id, net::SpdyStreamId promised_stream_id);
+
+ // SpdyServerPushInterface methods:
+ // Initiate a SPDY server push, roughly by pretending that the client sent a
+ // SYN_STREAM with the given headers. To repeat: the headers argument is
+ // _not_ the headers that the server will send to the client, but rather the
+ // headers to _pretend_ that the client sent to the server. Requires that
+ // spdy_version() >= SPDY/3.
+ // Note that unlike most other methods of this class, StartServerPush may be
+ // called by stream threads, not just by the connection thread.
+ virtual SpdyServerPushInterface::PushStatus StartServerPush(
+ net::SpdyStreamId associated_stream_id,
+ int32 server_push_depth,
+ net::SpdyPriority priority,
+ const net::SpdyHeaderBlock& request_headers);
+
+ private:
+ // A helper class for wrapping tasks returned by
+ // SpdyStreamTaskFactory::NewStreamTask(). Running or cancelling this task
+ // simply runs/cancels the wrapped task; however, this object also keeps a
+ // SpdyStream object, and on deletion, this will remove itself from the
+ // SpdySession's list of active streams.
+ class StreamTaskWrapper : public net_instaweb::Function {
+ public:
+ // This constructor, called by the main connection thread, will call
+ // task_factory_->NewStreamTask() to produce the wrapped task.
+ StreamTaskWrapper(SpdySession* spdy_session,
+ net::SpdyStreamId stream_id,
+ net::SpdyStreamId associated_stream_id,
+ int32 server_push_depth,
+ net::SpdyPriority priority);
+ virtual ~StreamTaskWrapper();
+
+ SpdyStream* stream() { return &stream_; }
+
+ protected:
+ // net_instaweb::Function methods (our implementations of these simply
+ // run/cancel the wrapped subtask):
+ virtual void Run();
+ virtual void Cancel();
+
+ private:
+ SpdySession* const spdy_session_;
+ SpdyStream stream_;
+ net_instaweb::Function* const subtask_;
+
+ DISALLOW_COPY_AND_ASSIGN(StreamTaskWrapper);
+ };
+
+ // Helper class for keeping track of active stream tasks, and separately
+ // tracking the number of active client/server-initiated streams. This class
+ // is not thread-safe without external synchronization, so it is used below
+ // along with a separate mutex.
+ class SpdyStreamMap {
+ public:
+ SpdyStreamMap();
+ ~SpdyStreamMap();
+
+ // Determine whether there are no currently active streams.
+ bool IsEmpty();
+ // Get the number of currently active streams created by the client or
+ // server, respectively.
+ size_t NumActiveClientStreams();
+ size_t NumActivePushStreams();
+ // Determine if a particular stream ID is currently active.
+ bool IsStreamActive(net::SpdyStreamId stream_id);
+ // Get the specified stream object, or NULL if the stream is inactive.
+ SpdyStream* GetStream(net::SpdyStreamId stream_id);
+ // Add a new stream. Requires that the stream ID is currently inactive.
+ void AddStreamTask(StreamTaskWrapper* task);
+ // Remove a stream task. Requires that the stream is currently active.
+ void RemoveStreamTask(StreamTaskWrapper* task);
+ // Adjust the output window size of all active streams by the same delta.
+ void AdjustAllOutputWindowSizes(int32 delta);
+ // Abort all streams in the map. Note that this won't immediately empty
+ // the map (the tasks still have to shut down).
+ void AbortAllSilently();
+
+ private:
+ typedef std::map<net::SpdyStreamId, StreamTaskWrapper*> TaskMap;
+ TaskMap tasks_;
+ size_t num_active_push_streams_;
+
+ DISALLOW_COPY_AND_ASSIGN(SpdyStreamMap);
+ };
+
+ // Validate and set the per-stream initial flow-control window size to the
+ // new value. Must be using SPDY v3 or later to call this method.
+ void SetInitialWindowSize(uint32 new_init_window_size);
+
+ // Send a single SPDY frame to the client, compressing it first if necessary.
+ // Stop the session if the connection turns out to be closed. This method
+ // takes ownership of the passed frame and will delete it.
+ void SendFrame(const net::SpdyFrameIR* frame);
+ // Send the frame as-is (without taking ownership). Stop the session if the
+ // connection turns out to be closed.
+ void SendFrameRaw(const net::SpdySerializedFrame& frame);
+
+ // Immediately send a GOAWAY frame to the client with the given status,
+ // unless we've already sent one. This also prevents us from creating any
+ // new streams, so calling this is the best way to shut the session down
+ // gracefully; once all streams have finished normally and no new ones can be
+ // created, the session will shut itself down.
+ void SendGoAwayFrame(net::SpdyGoAwayStatus status);
+ // Enqueue a RST_STREAM frame for the given stream ID. Note that this does
+ // not abort the stream if it exists; for that, use AbortStream().
+ void SendRstStreamFrame(net::SpdyStreamId stream_id,
+ net::SpdyRstStreamStatus status);
+ // Immediately send our SETTINGS frame, with values based on our
+ // SpdyServerConfig object. This should be done exactly once, at session
+ // start.
+ void SendSettingsFrame();
+
+ // Close down the whole session immediately. Abort all active streams, and
+ // then block until all stream threads have shut down.
+ void StopSession();
+ // Abort the stream without sending anything to the client.
+ void AbortStreamSilently(net::SpdyStreamId stream_id);
+ // Send a RST_STREAM frame and then abort the stream.
+ void AbortStream(net::SpdyStreamId stream_id,
+ net::SpdyRstStreamStatus status);
+
+ // Remove the given StreamTaskWrapper object from the stream map. This is
+ // the only other method of this class, aside from StartServerPush, that
+ // might be called from another thread. (Specifically, it is called by the
+ // StreamTaskWrapper destructor, which is called by the executor).
+ void RemoveStreamTask(StreamTaskWrapper* stream_data);
+
+ // Grab the stream_map_lock_ and check if stream_map_ is empty.
+ bool StreamMapIsEmpty();
+
+ // These fields are accessed only by the main connection thread, so they need
+ // not be protected by a lock:
+ const spdy::SpdyVersion spdy_version_;
+ const SpdyServerConfig* const config_;
+ SpdySessionIO* const session_io_;
+ SpdyStreamTaskFactory* const task_factory_;
+ Executor* const executor_;
+ net::BufferedSpdyFramer framer_;
+ bool session_stopped_; // StopSession() has been called
+ bool already_sent_goaway_; // GOAWAY frame has been sent
+ net::SpdyStreamId last_client_stream_id_;
+ int32 initial_window_size_; // per-stream initial flow-control window size
+ uint32 max_concurrent_pushes_; // max number of active server pushes at once
+
+ // The stream map must be protected by a lock, because each stream thread
+ // will remove itself from the map (by calling RemoveStreamTask) when the
+ // stream closes. You MUST hold the lock to use the stream_map_ OR to use
+ // any of the StreamTaskWrapper or SpdyStream objects contained therein
+ // (e.g. to post a frame to the stream), otherwise the stream object may be
+ // deleted by another thread while you're using it. You should NOT be
+ // holding the lock when you e.g. send a frame to the client, as that may
+ // block for a long time.
+ base::Lock stream_map_lock_;
+ SpdyStreamMap stream_map_;
+ // These fields are also protected by the stream_map_lock_; they are used for
+ // controlling server pushes, which can be initiated by stream threads as
+ // well as by the connection thread. We could use a separate lock for these,
+ // but right now we probably don't need that much locking granularity.
+ net::SpdyStreamId last_server_push_stream_id_;
+ bool received_goaway_; // we've received a GOAWAY frame from the client
+
+ // These objects are also shared between all stream threads, but these
+ // classes are each thread-safe, and don't need additional synchronization.
+ SpdyFramePriorityQueue output_queue_;
+ SharedFlowControlWindow shared_window_;
+
+ DISALLOW_COPY_AND_ASSIGN(SpdySession);
+};
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_COMMON_SPDY_SESSION_H_
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,23 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_session_io.h"
+
+namespace mod_spdy {
+
+SpdySessionIO::SpdySessionIO() {}
+
+SpdySessionIO::~SpdySessionIO() {}
+
+} // namespace mod_spdy
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h Thu May 1 11:39:27 2014
@@ -0,0 +1,82 @@
+// Copyright 2011 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef MOD_SPDY_COMMON_SPDY_SESSION_IO_H_
+#define MOD_SPDY_COMMON_SPDY_SESSION_IO_H_
+
+#include "base/basictypes.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace net {
+class BufferedSpdyFramer;
+} // namespace net
+
+namespace mod_spdy {
+
+class SpdyStream;
+
+// SpdySessionIO is a helper interface for the SpdySession class. The
+// SpdySessionIO takes care of implementation-specific details about how to
+// send and receive data, allowing the SpdySession to focus on the SPDY
+// protocol itself. For example, a SpdySessionIO for Apache would hold onto a
+// conn_rec object and invoke the input and output filter chains for
+// ProcessAvailableInput and SendFrameRaw, respectively. The SpdySessionIO
+// itself does not need to be thread-safe -- it is only ever used by the main
+// connection thread.
+class SpdySessionIO {
+ public:
+ // Status to describe whether reading succeeded.
+ enum ReadStatus {
+ READ_SUCCESS, // we successfully pushed data into the SpdyFramer
+ READ_NO_DATA, // no data is currently available
+ READ_CONNECTION_CLOSED, // the connection has been closed
+ READ_ERROR // an unrecoverable error (e.g. client sent malformed data)
+ };
+
+ // Status to describe whether writing succeeded.
+ enum WriteStatus {
+ WRITE_SUCCESS, // we successfully wrote the frame out to the network
+ WRITE_CONNECTION_CLOSED, // the connection has been closed
+ };
+
+ SpdySessionIO();
+ virtual ~SpdySessionIO();
+
+ // Return true if the connection has been externally aborted and should
+ // stop, false otherwise.
+ virtual bool IsConnectionAborted() = 0;
+
+ // Pull any available input data from the connection and feed it into the
+ // ProcessInput() method of the given SpdyFramer. If no input data is
+ // currently available and the block argument is true, this should block
+ // until more data arrives; otherwise, this should not block.
+ virtual ReadStatus ProcessAvailableInput(
+ bool block, net::BufferedSpdyFramer* framer) = 0;
+
+ // Send a single SPDY frame to the client as-is; block until it has been
+ // sent down the wire. Return true on success.
+ //
+ // TODO(mdsteele): We do need to be able to flush a single frame down the
+ // wire, but we probably don't need/want to flush every single frame
+ // individually in places where we send multiple frames at once. We'll
+ // probably want to adjust this API a bit.
+ virtual WriteStatus SendFrameRaw(const net::SpdySerializedFrame& frame) = 0;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(SpdySessionIO);
+};
+
+} // namespace mod_spdy
+
+#endif // MOD_SPDY_COMMON_SPDY_SESSION_IO_H_
Propchange: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_io.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_test.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_test.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_test.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_session_test.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,1070 @@
+// Copyright 2010 Google Inc. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_session.h"
+
+#include <list>
+#include <string>
+
+#include "base/basictypes.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/spdy_server_config.h"
+#include "mod_spdy/common/spdy_session_io.h"
+#include "mod_spdy/common/spdy_stream_task_factory.h"
+#include "mod_spdy/common/testing/spdy_frame_matchers.h"
+#include "mod_spdy/common/thread_pool.h"
+#include "net/instaweb/util/public/function.h"
+#include "net/spdy/buffered_spdy_framer.h"
+#include "net/spdy/spdy_framer.h"
+#include "net/spdy/spdy_protocol.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+using mod_spdy::testing::IsDataFrame;
+using mod_spdy::testing::IsGoAway;
+using mod_spdy::testing::IsHeaders;
+using mod_spdy::testing::IsPing;
+using mod_spdy::testing::IsRstStream;
+using mod_spdy::testing::IsSettings;
+using mod_spdy::testing::IsSynReply;
+using mod_spdy::testing::IsSynStream;
+using testing::_;
+using testing::AllOf;
+using testing::AtLeast;
+using testing::DoAll;
+using testing::Eq;
+using testing::Invoke;
+using testing::InvokeWithoutArgs;
+using testing::NotNull;
+using testing::Property;
+using testing::Return;
+using testing::StrictMock;
+using testing::WithArg;
+
+namespace {
+
+void AddRequestHeaders(mod_spdy::spdy::SpdyVersion version,
+ net::SpdyNameValueBlock *headers) {
+ const bool spdy2 = version < mod_spdy::spdy::SPDY_VERSION_3;
+ (*headers)[spdy2 ? mod_spdy::http::kHost :
+ mod_spdy::spdy::kSpdy3Host] = "www.example.com";
+ (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Method :
+ mod_spdy::spdy::kSpdy3Method] = "GET";
+ (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Scheme :
+ mod_spdy::spdy::kSpdy3Scheme] = "https";
+ (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Url :
+ mod_spdy::spdy::kSpdy3Path] = "/foo/index.html";
+ (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Version :
+ mod_spdy::spdy::kSpdy3Version] = "HTTP/1.1";
+}
+
+void AddResponseHeaders(mod_spdy::spdy::SpdyVersion version,
+ net::SpdyNameValueBlock *headers) {
+ const bool spdy2 = version < mod_spdy::spdy::SPDY_VERSION_3;
+ (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Status :
+ mod_spdy::spdy::kSpdy3Status] = "200";
+ (*headers)[spdy2 ? mod_spdy::spdy::kSpdy2Version :
+ mod_spdy::spdy::kSpdy3Version] = "HTTP/1.1";
+ (*headers)[mod_spdy::http::kContentType] = "text/html";
+}
+
+void AddInitialServerPushHeaders(const std::string& path,
+ net::SpdyNameValueBlock *headers) {
+ (*headers)[mod_spdy::spdy::kSpdy3Host] = "www.example.com";
+ (*headers)[mod_spdy::spdy::kSpdy3Path] = path;
+ (*headers)[mod_spdy::spdy::kSpdy3Scheme] = "https";
+}
+
+class MockSpdySessionIO : public mod_spdy::SpdySessionIO {
+ public:
+ MOCK_METHOD0(IsConnectionAborted, bool());
+ MOCK_METHOD2(ProcessAvailableInput,
+ ReadStatus(bool, net::BufferedSpdyFramer*));
+ MOCK_METHOD1(SendFrameRaw, WriteStatus(const net::SpdySerializedFrame&));
+};
+
+class MockSpdyStreamTaskFactory : public mod_spdy::SpdyStreamTaskFactory {
+ public:
+ MOCK_METHOD1(NewStreamTask, net_instaweb::Function*(mod_spdy::SpdyStream*));
+};
+
+class MockStreamTask : public net_instaweb::Function {
+ public:
+ MockStreamTask() : stream(NULL) {}
+ MOCK_METHOD0(Run, void());
+ MOCK_METHOD0(Cancel, void());
+ mod_spdy::SpdyStream* stream;
+ private:
+ DISALLOW_COPY_AND_ASSIGN(MockStreamTask);
+};
+
+// gMock action to be used with NewStreamTask.
+ACTION_P(ReturnMockTask, task) {
+ task->stream = arg0;
+ return task;
+}
+
+// gMock action to be used with MockStreamTask::Run.
+ACTION_P4(StartServerPush, task, priority, path, expected_status) {
+ net::SpdyHeaderBlock push_headers;
+ AddInitialServerPushHeaders(path, &push_headers);
+ EXPECT_EQ(expected_status,
+ task->stream->StartServerPush(priority, push_headers));
+}
+
+// gMock action to be used with MockStreamTask::Run.
+ACTION_P(SendResponseHeaders, task) {
+ net::SpdyHeaderBlock headers;
+ AddResponseHeaders(task->stream->spdy_version(), &headers);
+ if (task->stream->is_server_push()) {
+ task->stream->SendOutputHeaders(headers, false);
+ } else {
+ task->stream->SendOutputSynReply(headers, false);
+ }
+}
+
+// gMock action to be used with MockStreamTask::Run.
+ACTION_P3(SendDataFrame, task, data, fin) {
+ task->stream->SendOutputDataFrame(data, fin);
+}
+
+// gMock action to be used with MockStreamTask::Run.
+ACTION_P(ConsumeInputUntilAborted, task) {
+ while (!task->stream->is_aborted()) {
+ net::SpdyFrameIR* raw_frame = NULL;
+ if (task->stream->GetInputFrame(true, &raw_frame)) {
+ delete raw_frame;
+ }
+ }
+}
+
+// An executor that runs all tasks in the same thread, either immediately when
+// they are added or when it is told to run them.
+class InlineExecutor : public mod_spdy::Executor {
+ public:
+ InlineExecutor() : run_on_add_(false), stopped_(false) {}
+ virtual ~InlineExecutor() { Stop(); }
+
+ virtual void AddTask(net_instaweb::Function* task,
+ net::SpdyPriority priority) {
+ if (stopped_) {
+ task->CallCancel();
+ } else if (run_on_add_) {
+ task->CallRun();
+ } else {
+ tasks_.push_back(task);
+ }
+ }
+ virtual void Stop() {
+ stopped_ = true;
+ while (!tasks_.empty()) {
+ tasks_.front()->CallCancel();
+ tasks_.pop_front();
+ }
+ }
+ void RunOne() {
+ if (!tasks_.empty()) {
+ tasks_.front()->CallRun();
+ tasks_.pop_front();
+ }
+ }
+ void RunAll() {
+ while (!tasks_.empty()) {
+ RunOne();
+ }
+ }
+ void set_run_on_add(bool run) { run_on_add_ = run; }
+ bool stopped() const { return stopped_; }
+
+ private:
+ std::list<net_instaweb::Function*> tasks_;
+ bool run_on_add_;
+ bool stopped_;
+
+ DISALLOW_COPY_AND_ASSIGN(InlineExecutor);
+};
+
+// A BufferedSpdyFramer visitor that constructs IR objects for the frames it
+// parses.
+class ClientVisitor : public net::BufferedSpdyFramerVisitorInterface {
+ public:
+ ClientVisitor() : last_data_(NULL), last_settings_(NULL) {}
+ virtual ~ClientVisitor() {}
+
+ virtual void OnError(net::SpdyFramer::SpdyError error_code) {}
+ virtual void OnStreamError(net::SpdyStreamId stream_id,
+ const std::string& description) {}
+ virtual void OnSynStream(net::SpdyStreamId id, net::SpdyStreamId assoc_id,
+ net::SpdyPriority priority, uint8 slot,
+ bool fin, bool unidirectional,
+ const net::SpdyHeaderBlock& headers) {
+ scoped_ptr<net::SpdySynStreamIR> frame(new net::SpdySynStreamIR(id));
+ frame->set_associated_to_stream_id(assoc_id);
+ frame->set_priority(priority);
+ frame->set_slot(slot);
+ frame->set_fin(fin);
+ frame->set_unidirectional(unidirectional);
+ frame->GetMutableNameValueBlock()->insert(
+ headers.begin(), headers.end());
+ last_frame_.reset(frame.release());
+ }
+ virtual void OnSynReply(net::SpdyStreamId id, bool fin,
+ const net::SpdyHeaderBlock& headers) {
+ scoped_ptr<net::SpdySynReplyIR> frame(new net::SpdySynReplyIR(id));
+ frame->set_fin(fin);
+ frame->GetMutableNameValueBlock()->insert(
+ headers.begin(), headers.end());
+ last_frame_.reset(frame.release());
+ }
+ virtual void OnHeaders(net::SpdyStreamId id, bool fin,
+ const net::SpdyHeaderBlock& headers) {
+ scoped_ptr<net::SpdyHeadersIR> frame(new net::SpdyHeadersIR(id));
+ frame->set_fin(fin);
+ frame->GetMutableNameValueBlock()->insert(
+ headers.begin(), headers.end());
+ last_frame_.reset(frame.release());
+ }
+ virtual void OnStreamFrameData(net::SpdyStreamId id, const char* data,
+ size_t len, bool fin) {
+ if (len == 0 && last_data_ != NULL && last_data_ == last_frame_.get()) {
+ last_data_->set_fin(fin);
+ } else {
+ scoped_ptr<net::SpdyDataIR> frame(new net::SpdyDataIR(
+ id, base::StringPiece(data, len)));
+ frame->set_fin(fin);
+ last_data_ = frame.get();
+ last_frame_.reset(frame.release());
+ }
+ }
+ virtual void OnSettings(bool clear_persisted) {
+ scoped_ptr<net::SpdySettingsIR> frame(new net::SpdySettingsIR);
+ frame->set_clear_settings(clear_persisted);
+ last_settings_ = frame.get();
+ last_frame_.reset(frame.release());
+ }
+ virtual void OnSetting(net::SpdySettingsIds id, uint8 flags, uint32 value) {
+ CHECK(last_settings_ != NULL && last_settings_ == last_frame_.get());
+ last_settings_->AddSetting(
+ id, (flags & net::SETTINGS_FLAG_PLEASE_PERSIST),
+ (flags & net::SETTINGS_FLAG_PERSISTED), value);
+ }
+ virtual void OnPing(uint32 id) {
+ last_frame_.reset(new net::SpdyPingIR(id));
+ }
+ virtual void OnRstStream(net::SpdyStreamId id,
+ net::SpdyRstStreamStatus status) {
+ last_frame_.reset(new net::SpdyRstStreamIR(id, status));
+ }
+ virtual void OnGoAway(net::SpdyStreamId id, net::SpdyGoAwayStatus status) {
+ last_frame_.reset(new net::SpdyGoAwayIR(id, status));
+ }
+ virtual void OnWindowUpdate(net::SpdyStreamId id, uint32 delta) {
+ last_frame_.reset(new net::SpdyWindowUpdateIR(id, delta));
+ }
+ virtual void OnPushPromise(net::SpdyStreamId id, net::SpdyStreamId promise) {
+ last_frame_.reset(new net::SpdyPushPromiseIR(id, promise));
+ }
+
+ net::SpdyFrameIR* ReleaseLastFrame() {
+ return last_frame_.release();
+ }
+
+ private:
+ net::SpdyDataIR* last_data_;
+ net::SpdySettingsIR* last_settings_;
+ scoped_ptr<net::SpdyFrameIR> last_frame_;
+
+ DISALLOW_COPY_AND_ASSIGN(ClientVisitor);
+};
+
+ACTION_P2(ClientDecodeFrame, test, matcher) {
+ scoped_ptr<net::SpdyFrameIR> frame(test->DecodeFrameOnClient(arg0));
+ ASSERT_TRUE(frame != NULL);
+ EXPECT_THAT(*frame, matcher);
+}
+
+ACTION_P3(SendBackWindowUpdate, test, stream_id, delta) {
+ test->ReceiveWindowUpdateFrameFromClient(stream_id, delta);
+}
+
+ACTION_P3(SendBackSettings, test, key, value) {
+ test->ReceiveSettingsFrameFromClient(key, value);
+}
+
+// Base class for SpdySession tests.
+class SpdySessionTestBase :
+ public testing::TestWithParam<mod_spdy::spdy::SpdyVersion> {
+ public:
+ SpdySessionTestBase()
+ : spdy_version_(GetParam()),
+ client_framer_(mod_spdy::SpdyVersionToFramerVersion(spdy_version_),
+ true) {
+ client_framer_.set_visitor(&client_visitor_);
+ ON_CALL(session_io_, IsConnectionAborted()).WillByDefault(Return(false));
+ ON_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+ .WillByDefault(Invoke(this, &SpdySessionTestBase::ReadNextInputChunk));
+ ON_CALL(session_io_, SendFrameRaw(_))
+ .WillByDefault(Return(mod_spdy::SpdySessionIO::WRITE_SUCCESS));
+ }
+
+ // Use as gMock action for ProcessAvailableInput:
+ // Invoke(this, &SpdySessionTest::ReadNextInputChunk)
+ mod_spdy::SpdySessionIO::ReadStatus ReadNextInputChunk(
+ bool block, net::BufferedSpdyFramer* framer) {
+ if (input_queue_.empty()) {
+ return mod_spdy::SpdySessionIO::READ_NO_DATA;
+ }
+ const std::string chunk = input_queue_.front();
+ input_queue_.pop_front();
+ framer->ProcessInput(chunk.data(), chunk.size());
+ return (framer->HasError() ? mod_spdy::SpdySessionIO::READ_ERROR :
+ mod_spdy::SpdySessionIO::READ_SUCCESS);
+ }
+
+ // This is called by the ClientDecodeFrame gMock action defined above.
+ net::SpdyFrameIR* DecodeFrameOnClient(
+ const net::SpdySerializedFrame& frame) {
+ client_framer_.ProcessInput(frame.data(), frame.size());
+ return client_visitor_.ReleaseLastFrame();
+ }
+
+ // Push a frame into the input queue.
+ void ReceiveFrameFromClient(const net::SpdySerializedFrame& frame) {
+ input_queue_.push_back(std::string(frame.data(), frame.size()));
+ }
+
+ // Push a PING frame into the input queue.
+ void ReceivePingFromClient(uint32 id) {
+ scoped_ptr<net::SpdySerializedFrame> frame(
+ client_framer_.CreatePingFrame(id));
+ ReceiveFrameFromClient(*frame);
+ }
+
+ // Push a valid SYN_STREAM frame into the input queue.
+ void ReceiveSynStreamFromClient(net::SpdyStreamId stream_id,
+ net::SpdyPriority priority,
+ net::SpdyControlFlags flags) {
+ net::SpdyHeaderBlock headers;
+ AddRequestHeaders(spdy_version_, &headers);
+ scoped_ptr<net::SpdySerializedFrame> frame(client_framer_.CreateSynStream(
+ stream_id, 0, priority, 0, flags,
+ true, // true = use compression
+ &headers));
+ ReceiveFrameFromClient(*frame);
+ }
+
+ // Push a valid DATA frame into the input queue.
+ void ReceiveDataFromClient(net::SpdyStreamId stream_id,
+ base::StringPiece data,
+ net::SpdyDataFlags flags) {
+ scoped_ptr<net::SpdySerializedFrame> frame(client_framer_.CreateDataFrame(
+ stream_id, data.data(), data.size(), flags));
+ ReceiveFrameFromClient(*frame);
+ }
+
+ // Push a SETTINGS frame into the input queue.
+ void ReceiveSettingsFrameFromClient(
+ net::SpdySettingsIds setting, uint32 value) {
+ net::SettingsMap settings;
+ settings[setting] = std::make_pair(net::SETTINGS_FLAG_NONE, value);
+ scoped_ptr<net::SpdySerializedFrame> frame(
+ client_framer_.CreateSettings(settings));
+ ReceiveFrameFromClient(*frame);
+ }
+
+ // Push a WINDOW_UPDATE frame into the input queue.
+ void ReceiveWindowUpdateFrameFromClient(
+ net::SpdyStreamId stream_id, uint32 delta) {
+ scoped_ptr<net::SpdySerializedFrame> frame(
+ client_framer_.CreateWindowUpdate(stream_id, delta));
+ ReceiveFrameFromClient(*frame);
+ }
+
+ protected:
+ void ExpectSendFrame(::testing::Matcher<const net::SpdyFrameIR&> matcher) {
+ EXPECT_CALL(session_io_, SendFrameRaw(_))
+ .WillOnce(DoAll(ClientDecodeFrame(this, matcher),
+ Return(mod_spdy::SpdySessionIO::WRITE_SUCCESS)));
+ }
+
+ void ExpectBeginServerPush(
+ net::SpdyStreamId stream_id, net::SpdyStreamId assoc_stream_id,
+ net::SpdyPriority priority, const std::string& path) {
+ net::SpdyNameValueBlock headers;
+ AddInitialServerPushHeaders(path, &headers);
+ ExpectSendFrame(IsSynStream(stream_id, assoc_stream_id, priority, false,
+ true, headers));
+ }
+
+ void ExpectSendSynReply(net::SpdyStreamId stream_id, bool fin) {
+ net::SpdyNameValueBlock headers;
+ AddResponseHeaders(spdy_version_, &headers);
+ ExpectSendFrame(IsSynReply(stream_id, fin, headers));
+ }
+
+ void ExpectSendHeaders(net::SpdyStreamId stream_id, bool fin) {
+ net::SpdyNameValueBlock headers;
+ AddResponseHeaders(spdy_version_, &headers);
+ ExpectSendFrame(IsHeaders(stream_id, fin, headers));
+ }
+
+ void ExpectSendGoAway(net::SpdyStreamId last_stream_id,
+ net::SpdyGoAwayStatus status) {
+ // SPDY/2 doesn't have status codes on GOAWAY frames, so for SPDY/2 the
+ // client framer decodes it as GOAWAY_OK regardless of what we sent.
+ if (spdy_version_ == mod_spdy::spdy::SPDY_VERSION_2) {
+ ExpectSendFrame(IsGoAway(last_stream_id, net::GOAWAY_OK));
+ } else {
+ ExpectSendFrame(IsGoAway(last_stream_id, status));
+ }
+ }
+
+ const mod_spdy::spdy::SpdyVersion spdy_version_;
+ ClientVisitor client_visitor_;
+ net::BufferedSpdyFramer client_framer_;
+ mod_spdy::SpdyServerConfig config_;
+ StrictMock<MockSpdySessionIO> session_io_;
+ StrictMock<MockSpdyStreamTaskFactory> task_factory_;
+ std::list<std::string> input_queue_;
+};
+
+// Class for most SpdySession tests; this uses an InlineExecutor, so that test
+// behavior is very predictable.
+class SpdySessionTest : public SpdySessionTestBase {
+ public:
+ SpdySessionTest()
+ : session_(spdy_version_, &config_, &session_io_, &task_factory_,
+ &executor_) {}
+
+ protected:
+ InlineExecutor executor_;
+ mod_spdy::SpdySession session_;
+};
+
+// Test that if the connection is already closed, we stop immediately.
+TEST_P(SpdySessionTest, ConnectionAlreadyClosed) {
+ testing::InSequence seq;
+ EXPECT_CALL(session_io_, SendFrameRaw(_))
+ .WillOnce(Return(mod_spdy::SpdySessionIO::WRITE_CONNECTION_CLOSED));
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the connection is aborted, we stop.
+TEST_P(SpdySessionTest, ImmediateConnectionAbort) {
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted()).WillOnce(Return(true));
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Test responding to a PING frame from the client (followed by the connection
+// closing, so that we can exit the Run loop).
+TEST_P(SpdySessionTest, SinglePing) {
+ ReceivePingFromClient(47);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ ExpectSendFrame(IsPing(47));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()))
+ .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+ ExpectSendGoAway(0, net::GOAWAY_OK);
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Test handling a single stream request.
+TEST_P(SpdySessionTest, SingleStream) {
+ MockStreamTask* task = new MockStreamTask;
+ executor_.set_run_on_add(false);
+ const net::SpdyStreamId stream_id = 1;
+ const net::SpdyPriority priority = 2;
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+ .WillOnce(ReturnMockTask(task));
+ EXPECT_CALL(session_io_, IsConnectionAborted())
+ .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+ Return(false)));
+ EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+ SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+ SendDataFrame(task, "quux", true)));
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(false), NotNull()));
+ ExpectSendSynReply(stream_id, false);
+ ExpectSendFrame(IsDataFrame(stream_id, false, "foobar"));
+ ExpectSendFrame(IsDataFrame(stream_id, true, "quux"));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()))
+ .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+ ExpectSendGoAway(1, net::GOAWAY_OK);
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that if SendFrameRaw fails, we immediately stop trying to send data and
+// shut down the session.
+TEST_P(SpdySessionTest, ShutDownSessionIfSendFrameRawFails) {
+ MockStreamTask* task = new MockStreamTask;
+ executor_.set_run_on_add(false);
+ const net::SpdyStreamId stream_id = 1;
+ const net::SpdyPriority priority = 2;
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+ testing::InSequence seq;
+ // We start out the same way as in the SingleStream test above.
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, _));
+ EXPECT_CALL(task_factory_, NewStreamTask(_))
+ .WillOnce(ReturnMockTask(task));
+ EXPECT_CALL(session_io_, IsConnectionAborted())
+ .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+ Return(false)));
+ EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+ SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+ SendDataFrame(task, "quux", true)));
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, _));
+ ExpectSendSynReply(stream_id, false);
+ // At this point, the connection is closed by the client.
+ EXPECT_CALL(session_io_, SendFrameRaw(_))
+ .WillOnce(Return(mod_spdy::SpdySessionIO::WRITE_CONNECTION_CLOSED));
+ // Even though we have another frame to send at this point (already in the
+ // output queue), we immediately stop sending data and exit the session.
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the client sends us garbage data, we send a GOAWAY frame and
+// then quit.
+TEST_P(SpdySessionTest, SendGoawayInResponseToGarbage) {
+ input_queue_.push_back("\x88\x5f\x92\x02\xf8\x92\x12\xd1"
+ "\x82\xdc\x1a\x40\xbb\xb2\x9d\x13");
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Test that when the client sends us a SYN_STREAM with a corrupted header
+// block, we send a GOAWAY frame and then quit.
+TEST_P(SpdySessionTest, SendGoawayForBadSynStreamCompression) {
+ net::SpdyHeaderBlock headers;
+ headers["foobar"] = "Foo is to bar as bar is to baz.";
+ net::SpdyFramer framer(mod_spdy::SpdyVersionToFramerVersion(spdy_version_));
+ framer.set_enable_compression(false);
+ scoped_ptr<net::SpdySerializedFrame> frame(framer.CreateSynStream(
+ 1, 0, framer.GetHighestPriority(), 0, net::CONTROL_FLAG_FIN,
+ false, // false = no compression
+ &headers));
+ ReceiveFrameFromClient(*frame);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// TODO(mdsteele): At the moment, SpdyFramer DCHECKs that the stream ID is
+// nonzero when decoding, so this test would crash in debug builds. Once this
+// has been corrected in the Chromium code, we can remove this #ifdef.
+#ifdef NDEBUG
+// Test that when the client sends us a SYN_STREAM with a stream ID of 0, we
+// send a GOAWAY frame and then quit.
+TEST_P(SpdySessionTest, SendGoawayForSynStreamIdZero) {
+ net::SpdyHeaderBlock headers;
+ AddRequestHeaders(spdy_version_, &headers);
+ scoped_ptr<net::SpdySerializedFrame> frame(client_framer_.CreateSynStream(
+ 0, 0, client_framer_.GetHighestPriority(), 0, net::CONTROL_FLAG_FIN,
+ true, // true = use compression
+ &headers));
+ ReceiveFrameFromClient(*frame);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+#endif
+
+// Test that when the client sends us two SYN_STREAMs with the same ID, we send
+// a GOAWAY frame (but still finish out the good stream before quitting).
+TEST_P(SpdySessionTest, SendGoawayForDuplicateStreamId) {
+ MockStreamTask* task = new MockStreamTask;
+ executor_.set_run_on_add(false);
+ const net::SpdyStreamId stream_id = 1;
+ const net::SpdyPriority priority = 2;
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ // Get the first SYN_STREAM; it looks good, so create a new task (but because
+ // we set executor_.set_run_on_add(false) above, it doesn't execute yet).
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+ .WillOnce(ReturnMockTask(task));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ // There's an active stream out, so ProcessAvailableInput should have false
+ // for the first argument (false = nonblocking read). Here we get the second
+ // SYN_STREAM with the same stream ID, so we should send GOAWAY.
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(false), NotNull()));
+ ExpectSendGoAway(1, net::GOAWAY_PROTOCOL_ERROR);
+ // At this point, tell the executor to run the task.
+ EXPECT_CALL(session_io_, IsConnectionAborted())
+ .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+ Return(false)));
+ EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+ SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+ SendDataFrame(task, "quux", true)));
+ // The stream is no longer active, but there are pending frames to send, so
+ // we shouldn't block on input.
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(false), NotNull()));
+ // Now we should send the output.
+ ExpectSendSynReply(stream_id, false);
+ ExpectSendFrame(IsDataFrame(stream_id, false, "foobar"));
+ ExpectSendFrame(IsDataFrame(stream_id, true, "quux"));
+ // Finally, there is no more output to send, and no chance of creating new
+ // streams (since we GOAWAY'd), so we quit.
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Run each test over both SPDY v2 and SPDY v3.
+INSTANTIATE_TEST_CASE_P(Spdy2And3, SpdySessionTest, testing::Values(
+ mod_spdy::spdy::SPDY_VERSION_2, mod_spdy::spdy::SPDY_VERSION_3,
+ mod_spdy::spdy::SPDY_VERSION_3_1));
+
+// Create a type alias so that we can instantiate some of our
+// SpdySessionTest-based tests using a different set of parameters.
+typedef SpdySessionTest SpdySessionNoFlowControlTest;
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE over SPDY v2.
+TEST_P(SpdySessionNoFlowControlTest, SendGoawayForInitialWindowSize) {
+ net::SettingsMap settings;
+ settings[net::SETTINGS_INITIAL_WINDOW_SIZE] =
+ std::make_pair(net::SETTINGS_FLAG_NONE, 4000);
+ scoped_ptr<net::SpdySerializedFrame> frame(
+ client_framer_.CreateSettings(settings));
+ ReceiveFrameFromClient(*frame);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Only run no-flow-control tests for SPDY v2.
+INSTANTIATE_TEST_CASE_P(Spdy2, SpdySessionNoFlowControlTest, testing::Values(
+ mod_spdy::spdy::SPDY_VERSION_2));
+
+// Test class for flow-control tests. This uses a ThreadPool Executor, so that
+// we can test concurrency behavior.
+class SpdySessionFlowControlTest : public SpdySessionTestBase {
+ public:
+ SpdySessionFlowControlTest() : thread_pool_(1, 1) {}
+
+ void SetUp() {
+ ASSERT_TRUE(thread_pool_.Start());
+ executor_.reset(thread_pool_.NewExecutor());
+ session_.reset(new mod_spdy::SpdySession(
+ spdy_version_, &config_, &session_io_, &task_factory_,
+ executor_.get()));
+ }
+
+ void ExpectSendDataGetWindowUpdateBack(
+ net::SpdyStreamId stream_id, bool fin, base::StringPiece payload) {
+ EXPECT_CALL(session_io_, SendFrameRaw(_)).WillOnce(DoAll(
+ ClientDecodeFrame(this, IsDataFrame(stream_id, fin, payload)),
+ SendBackWindowUpdate(this, stream_id, payload.size()),
+ Return(mod_spdy::SpdySessionIO::WRITE_SUCCESS)));
+ }
+
+ protected:
+ mod_spdy::ThreadPool thread_pool_;
+ scoped_ptr<mod_spdy::Executor> executor_;
+ scoped_ptr<mod_spdy::SpdySession> session_;
+};
+
+TEST_P(SpdySessionFlowControlTest, SingleStreamWithFlowControl) {
+ MockStreamTask* task = new MockStreamTask;
+ // Start by setting the initial window size to very small (three bytes).
+ ReceiveSettingsFrameFromClient(net::SETTINGS_INITIAL_WINDOW_SIZE, 3);
+ // Then send a SYN_STREAM.
+ const net::SpdyStreamId stream_id = 1;
+ const net::SpdyPriority priority = 2;
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+ // We'll have to go through the loop at least five times -- once for each of
+ // five frames that we _must_ receive (SETTINGS, SYN_STREAM, and three
+ // WINDOW_UDPATEs.
+ EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(5));
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+ .Times(AtLeast(5));
+
+ // The rest of these will have to happen in a fixed order.
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+ .WillOnce(ReturnMockTask(task));
+ EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+ SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+ SendDataFrame(task, "quux", true)));
+ // Since the window size is just three bytes, we can only send three bytes at
+ // a time.
+ ExpectSendSynReply(stream_id, false);
+ ExpectSendDataGetWindowUpdateBack(stream_id, false, "foo");
+ ExpectSendDataGetWindowUpdateBack(stream_id, false, "bar");
+ ExpectSendDataGetWindowUpdateBack(stream_id, false, "quu");
+ ExpectSendDataGetWindowUpdateBack(stream_id, true, "x");
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()))
+ .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+ ExpectSendGoAway(stream_id, net::GOAWAY_OK);
+
+ session_->Run();
+}
+
+// Suppose the input side of the connection closes while we're blocked on flow
+// control; we should abort the blocked streams.
+TEST_P(SpdySessionFlowControlTest, CeaseInputWithFlowControl) {
+ MockStreamTask* task = new MockStreamTask;
+ // Start by setting the initial window size to very small (three bytes).
+ ReceiveSettingsFrameFromClient(net::SETTINGS_INITIAL_WINDOW_SIZE, 3);
+ // Then send a SYN_STREAM.
+ const net::SpdyStreamId stream_id = 1;
+ const net::SpdyPriority priority = 2;
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+ EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(1));
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+ .Times(AtLeast(1));
+
+ // The rest of these will have to happen in a fixed order.
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+ .WillOnce(ReturnMockTask(task));
+ EXPECT_CALL(*task, Run()).WillOnce(DoAll(
+ SendResponseHeaders(task), SendDataFrame(task, "foobar", false),
+ SendDataFrame(task, "quux", true)));
+ ExpectSendSynReply(stream_id, false);
+ // Since the window size is just three bytes, we can only send three bytes at
+ // a time. The stream thread will then be blocked.
+ ExpectSendFrame(IsDataFrame(stream_id, false, "foo"));
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, _))
+ .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+ // At this point, we're blocked on flow control, and the test will close the
+ // input side of the connection. Since the stream can never complete, the
+ // session should abort the stream and shut down, rather than staying blocked
+ // forever.
+ ExpectSendGoAway(stream_id, net::GOAWAY_OK);
+
+ session_->Run();
+}
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE with a value of 0.
+TEST_P(SpdySessionFlowControlTest, SendGoawayForTooSmallInitialWindowSize) {
+ ReceiveSettingsFrameFromClient(net::SETTINGS_INITIAL_WINDOW_SIZE, 0);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+ session_->Run();
+}
+
+// Test that we send GOAWAY if the client tries to send
+// SETTINGS_INITIAL_WINDOW_SIZE with a value of 0x80000000.
+TEST_P(SpdySessionFlowControlTest, SendGoawayForTooLargeInitialWindowSize) {
+ ReceiveSettingsFrameFromClient(net::SETTINGS_INITIAL_WINDOW_SIZE,
+ 0x80000000);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+
+ session_->Run();
+}
+
+TEST_P(SpdySessionFlowControlTest, SharedOutputFlowControlWindow) {
+ ReceiveWindowUpdateFrameFromClient(0, 10000);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3_1) {
+ EXPECT_CALL(session_io_, IsConnectionAborted()).WillOnce(Return(true));
+ } else {
+ ExpectSendGoAway(0, net::GOAWAY_PROTOCOL_ERROR);
+ }
+
+ if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3_1) {
+ EXPECT_EQ(65536, session_->current_shared_output_window_size());
+ }
+ session_->Run();
+ if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3_1) {
+ EXPECT_EQ(75536, session_->current_shared_output_window_size());
+ }
+}
+
+TEST_P(SpdySessionFlowControlTest, SharedInputFlowControlWindow) {
+ MockStreamTask* task = new MockStreamTask;
+ const net::SpdyStreamId stream_id = 1;
+ const net::SpdyPriority priority = 2;
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_NONE);
+ const std::string data1(32000, 'x');
+ const std::string data2(2000, 'y');
+ ReceiveDataFromClient(stream_id, data1, net::DATA_FLAG_NONE);
+ ReceiveDataFromClient(stream_id, data1, net::DATA_FLAG_NONE);
+ ReceiveDataFromClient(stream_id, data2, net::DATA_FLAG_FIN);
+
+ EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(4));
+
+ // The rest of these will have to happen in a fixed order.
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ // Receive the SYN_STREAM from the client.
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()));
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+ .WillOnce(ReturnMockTask(task));
+ EXPECT_CALL(*task, Run()).WillOnce(ConsumeInputUntilAborted(task));
+ // Receive the first two blocks of data from the client with no problems.
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()));
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()));
+ // The third block of data is too much; it's a flow control error. For
+ // SPDY/3.1 and up it's a session flow control error; for SPDY/3 it's a
+ // stream flow control error.
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()));
+ if (session_->spdy_version() >= mod_spdy::spdy::SPDY_VERSION_3_1) {
+ ExpectSendGoAway(stream_id, net::GOAWAY_PROTOCOL_ERROR);
+ } else {
+ ExpectSendFrame(IsRstStream(1, net::RST_STREAM_FLOW_CONTROL_ERROR));
+ EXPECT_CALL(session_io_, IsConnectionAborted()).WillOnce(Return(true));
+ }
+
+ session_->Run();
+}
+
+// Only run flow control tests for SPDY v3 and up.
+INSTANTIATE_TEST_CASE_P(Spdy3, SpdySessionFlowControlTest, testing::Values(
+ mod_spdy::spdy::SPDY_VERSION_3, mod_spdy::spdy::SPDY_VERSION_3_1));
+
+// Create a type alias so that we can instantiate some of our
+// SpdySessionTest-based tests using a different set of parameters.
+typedef SpdySessionTest SpdySessionServerPushTest;
+
+TEST_P(SpdySessionServerPushTest, SimpleServerPush) {
+ MockStreamTask* task1 = new MockStreamTask;
+ MockStreamTask* task2 = new MockStreamTask;
+ executor_.set_run_on_add(true);
+ const net::SpdyStreamId stream_id = 3;
+ const net::SpdyPriority priority = 2;
+ const net::SpdyPriority push_priority = 3;
+ const std::string push_path = "/script.js";
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()));
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+ .WillOnce(ReturnMockTask(task1));
+ EXPECT_CALL(*task1, Run()).WillOnce(DoAll(
+ SendResponseHeaders(task1),
+ StartServerPush(task1, push_priority, push_path,
+ mod_spdy::SpdyServerPushInterface::PUSH_STARTED),
+ SendDataFrame(task1, "foobar", false),
+ SendDataFrame(task1, "quux", true)));
+ // We should right away create the server push task, and get the SYN_STREAM
+ // before any other frames from the original stream.
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(2u)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id,
+ Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(push_priority)))))
+ .WillOnce(ReturnMockTask(task2));
+ EXPECT_CALL(*task2, Run()).WillOnce(DoAll(
+ SendResponseHeaders(task2),
+ SendDataFrame(task2, "hello", false),
+ SendDataFrame(task2, "world", true)));
+ ExpectBeginServerPush(2u, stream_id, push_priority, push_path);
+ // The pushed stream has a low priority, so the rest of the first stream
+ // should get sent before the rest of the pushed stream.
+ ExpectSendSynReply(stream_id, false);
+ ExpectSendFrame(IsDataFrame(stream_id, false, "foobar"));
+ ExpectSendFrame(IsDataFrame(stream_id, true, "quux"));
+ // Now we should get the rest of the pushed stream.
+ ExpectSendHeaders(2u, false);
+ ExpectSendFrame(IsDataFrame(2u, false, "hello"));
+ ExpectSendFrame(IsDataFrame(2u, true, "world"));
+ // And, we're done.
+ EXPECT_CALL(session_io_, IsConnectionAborted());
+ EXPECT_CALL(session_io_, ProcessAvailableInput(Eq(true), NotNull()))
+ .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+ ExpectSendGoAway(stream_id, net::GOAWAY_OK);
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+TEST_P(SpdySessionServerPushTest, TooManyConcurrentPushes) {
+ MockStreamTask* task1 = new MockStreamTask;
+ MockStreamTask* task2 = new MockStreamTask;
+ MockStreamTask* task3 = new MockStreamTask;
+ executor_.set_run_on_add(false);
+ const net::SpdyStreamId stream_id = 9;
+ const net::SpdyPriority priority = 0;
+ ReceiveSettingsFrameFromClient(net::SETTINGS_MAX_CONCURRENT_STREAMS, 2);
+ ReceiveSynStreamFromClient(stream_id, priority, net::CONTROL_FLAG_FIN);
+
+ EXPECT_CALL(session_io_, IsConnectionAborted()).Times(AtLeast(3));
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+ .Times(AtLeast(3));
+
+ testing::InSequence seq;
+ ExpectSendFrame(IsSettings(net::SETTINGS_MAX_CONCURRENT_STREAMS, 100));
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id, Eq(0u)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(priority)))))
+ .WillOnce(ReturnMockTask(task1));
+ EXPECT_CALL(session_io_, IsConnectionAborted())
+ .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunOne),
+ Return(false)));
+ EXPECT_CALL(*task1, Run()).WillOnce(DoAll(
+ StartServerPush(task1, 3u, "/foo.css",
+ mod_spdy::SpdyServerPushInterface::PUSH_STARTED),
+ StartServerPush(task1, 2u, "/bar.css",
+ mod_spdy::SpdyServerPushInterface::PUSH_STARTED),
+ StartServerPush(task1, 1u, "/baz.css",
+ mod_spdy::SpdyServerPushInterface::TOO_MANY_CONCURRENT_PUSHES),
+ SendResponseHeaders(task1), SendDataFrame(task1, "html", true)));
+ // Start the first two pushes. The third push should fail due to too many
+ // concurrent pushes.
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(2u)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id,
+ Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(3u)))))
+ .WillOnce(ReturnMockTask(task2));
+ EXPECT_CALL(task_factory_, NewStreamTask(
+ AllOf(Property(&mod_spdy::SpdyStream::stream_id, Eq(4u)),
+ Property(&mod_spdy::SpdyStream::associated_stream_id,
+ Eq(stream_id)),
+ Property(&mod_spdy::SpdyStream::priority, Eq(2u)))))
+ .WillOnce(ReturnMockTask(task3));
+ // Now we get the SYN_STREAMs for the pushed streams before anything else.
+ ExpectBeginServerPush(2u, stream_id, 3u, "/foo.css");
+ ExpectBeginServerPush(4u, stream_id, 2u, "/bar.css");
+ // We now send the frames from the original stream.
+ ExpectSendSynReply(stream_id, false);
+ ExpectSendFrame(IsDataFrame(stream_id, true, "html"));
+ // At this point, the client will change MAX_CONCURRENT_STREAMS to zero. We
+ // shouldn't barf, even though we have more active push streams than the new
+ // maximum.
+ EXPECT_CALL(session_io_, IsConnectionAborted())
+ .WillOnce(DoAll(
+ SendBackSettings(this, net::SETTINGS_MAX_CONCURRENT_STREAMS, 0u),
+ Return(false)));
+ // Now let's run the rest of the tasks. One of them will try to start yet
+ // another server push, but that should fail because MAX_CONCURRENT_STREAMS
+ // is now zero.
+ EXPECT_CALL(session_io_, IsConnectionAborted())
+ .WillOnce(DoAll(InvokeWithoutArgs(&executor_, &InlineExecutor::RunAll),
+ Return(false)));
+ EXPECT_CALL(*task2, Run()).WillOnce(DoAll(
+ SendResponseHeaders(task2), SendDataFrame(task2, "foo", true)));
+ EXPECT_CALL(*task3, Run()).WillOnce(DoAll(
+ StartServerPush(task3, 3u, "/stuff.png",
+ mod_spdy::SpdyServerPushInterface::TOO_MANY_CONCURRENT_PUSHES),
+ SendResponseHeaders(task3), SendDataFrame(task3, "bar", true)));
+ // And now we get all those frames. The "bar" stream's frames should come
+ // first, because that's a higher-priority stream.
+ ExpectSendHeaders(4u, false);
+ ExpectSendFrame(IsDataFrame(4u, true, "bar"));
+ ExpectSendHeaders(2u, false);
+ ExpectSendFrame(IsDataFrame(2u, true, "foo"));
+ // And, we're done.
+ EXPECT_CALL(session_io_, ProcessAvailableInput(_, NotNull()))
+ .WillOnce(Return(mod_spdy::SpdySessionIO::READ_CONNECTION_CLOSED));
+ ExpectSendGoAway(stream_id, net::GOAWAY_OK);
+
+ session_.Run();
+ EXPECT_TRUE(executor_.stopped());
+}
+
+// Only run server push tests for SPDY v3 and up.
+INSTANTIATE_TEST_CASE_P(Spdy3, SpdySessionServerPushTest, testing::Values(
+ mod_spdy::spdy::SPDY_VERSION_3, mod_spdy::spdy::SPDY_VERSION_3_1));
+
+} // namespace
Added: httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_stream.cc
URL: http://svn.apache.org/viewvc/httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_stream.cc?rev=1591620&view=auto
==============================================================================
--- httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_stream.cc (added)
+++ httpd/mod_spdy/branches/httpd-2.2.x/mod_spdy/common/spdy_stream.cc Thu May 1 11:39:27 2014
@@ -0,0 +1,456 @@
+// Copyright 2010 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "mod_spdy/common/spdy_stream.h"
+
+
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/synchronization/condition_variable.h"
+#include "base/synchronization/lock.h"
+#include "mod_spdy/common/protocol_util.h"
+#include "mod_spdy/common/shared_flow_control_window.h"
+#include "mod_spdy/common/spdy_frame_priority_queue.h"
+#include "mod_spdy/common/spdy_frame_queue.h"
+#include "net/spdy/spdy_protocol.h"
+
+namespace {
+
+// The smallest WINDOW_UPDATE delta we're willing to send. If the client sends
+// us less than this much data, we wait for more data before sending a
+// WINDOW_UPDATE frame (so that we don't end up sending lots of little ones).
+const size_t kMinWindowUpdateSize =
+ static_cast<size_t>(net::kSpdyStreamInitialWindowSize) / 8;
+
+class DataLengthVisitor : public net::SpdyFrameVisitor {
+ public:
+ DataLengthVisitor() : length_(0) {}
+ virtual ~DataLengthVisitor() {}
+
+ size_t length() const { return length_; }
+
+ virtual void VisitSynStream(const net::SpdySynStreamIR& frame) {}
+ virtual void VisitSynReply(const net::SpdySynReplyIR& frame) {}
+ virtual void VisitRstStream(const net::SpdyRstStreamIR& frame) {}
+ virtual void VisitSettings(const net::SpdySettingsIR& frame) {}
+ virtual void VisitPing(const net::SpdyPingIR& frame) {}
+ virtual void VisitGoAway(const net::SpdyGoAwayIR& frame) {}
+ virtual void VisitHeaders(const net::SpdyHeadersIR& frame) {}
+ virtual void VisitWindowUpdate(const net::SpdyWindowUpdateIR& frame) {}
+ virtual void VisitCredential(const net::SpdyCredentialIR& frame) {}
+ virtual void VisitBlocked(const net::SpdyBlockedIR& frame) {}
+ virtual void VisitPushPromise(const net::SpdyPushPromiseIR& frame) {}
+ virtual void VisitData(const net::SpdyDataIR& frame) {
+ length_ = frame.data().size();
+ }
+
+ private:
+ size_t length_;
+
+ DISALLOW_COPY_AND_ASSIGN(DataLengthVisitor);
+};
+
+// For data frames, return the size of the data payload; for control frames,
+// return zero.
+size_t DataFrameLength(const net::SpdyFrameIR& frame) {
+ DataLengthVisitor visitor;
+ frame.Visit(&visitor);
+ return visitor.length();
+}
+
+} // namespace
+
+namespace mod_spdy {
+
+SpdyStream::SpdyStream(spdy::SpdyVersion spdy_version,
+ net::SpdyStreamId stream_id,
+ net::SpdyStreamId associated_stream_id,
+ int32 server_push_depth,
+ net::SpdyPriority priority,
+ int32 initial_output_window_size,
+ SpdyFramePriorityQueue* output_queue,
+ SharedFlowControlWindow* shared_window,
+ SpdyServerPushInterface* pusher)
+ : spdy_version_(spdy_version),
+ stream_id_(stream_id),
+ associated_stream_id_(associated_stream_id),
+ server_push_depth_(server_push_depth),
+ priority_(priority),
+ output_queue_(output_queue),
+ shared_window_(shared_window),
+ pusher_(pusher),
+ condvar_(&lock_),
+ aborted_(false),
+ output_window_size_(initial_output_window_size),
+ // TODO(mdsteele): Make our initial input window size configurable (we
+ // would send the chosen value to the client with a SETTINGS frame).
+ input_window_size_(net::kSpdyStreamInitialWindowSize),
+ input_bytes_consumed_(0) {
+ DCHECK_NE(spdy::SPDY_VERSION_NONE, spdy_version);
+ DCHECK(output_queue_);
+ DCHECK(shared_window_ || spdy_version < spdy::SPDY_VERSION_3_1);
+ DCHECK(pusher_);
+ DCHECK_GT(output_window_size_, 0);
+ // In SPDY v2, priorities are in the range 0-3; in SPDY v3, they are 0-7.
+ DCHECK_GE(priority, 0u);
+ DCHECK_LE(priority, LowestSpdyPriorityForVersion(spdy_version));
+}
+
+SpdyStream::~SpdyStream() {}
+
+bool SpdyStream::is_server_push() const {
+ // By the SPDY spec, a stream has an even stream ID if and only if it was
+ // initiated by the server.
+ return stream_id_ % 2 == 0;
+}
+
+bool SpdyStream::is_aborted() const {
+ base::AutoLock autolock(lock_);
+ return aborted_;
+}
+
+void SpdyStream::AbortSilently() {
+ base::AutoLock autolock(lock_);
+ InternalAbortSilently();
+}
+
+void SpdyStream::AbortWithRstStream(net::SpdyRstStreamStatus status) {
+ base::AutoLock autolock(lock_);
+ InternalAbortWithRstStream(status);
+}
+
+int32 SpdyStream::current_input_window_size() const {
+ base::AutoLock autolock(lock_);
+ DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+ return input_window_size_;
+}
+
+int32 SpdyStream::current_output_window_size() const {
+ base::AutoLock autolock(lock_);
+ DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+ return output_window_size_;
+}
+
+void SpdyStream::OnInputDataConsumed(size_t size) {
+ // Sanity check: there is no input data to absorb for a server push stream,
+ // so we should only be getting called for client-initiated streams.
+ DCHECK(!is_server_push());
+
+ // Flow control only exists for SPDY v3 and up, so for SPDY v2 we don't need
+ // to bother tracking this.
+ if (spdy_version() < spdy::SPDY_VERSION_3) {
+ return;
+ }
+
+ // If the size arg is zero, this method should be a no-op, so just quit now.
+ if (size == 0) {
+ return;
+ }
+
+ base::AutoLock autolock(lock_);
+
+ // Don't bother with any of this if the stream has been aborted.
+ if (aborted_) {
+ return;
+ }
+
+ // First, if we're using SPDY/3.1 or later, we need to deal with the shared
+ // session window. If after consuming this input data the shared window
+ // thinks it's time to send a WINDOW_UPDATE for the session input window
+ // (stream 0), send one, at top priority.
+ if (spdy_version_ >= spdy::SPDY_VERSION_3_1) {
+ const int32 shared_window_update =
+ shared_window_->OnInputDataConsumed(size);
+ if (shared_window_update > 0) {
+ output_queue_->Insert(
+ SpdyFramePriorityQueue::kTopPriority,
+ new net::SpdyWindowUpdateIR(0, shared_window_update));
+ }
+ }
+
+ // Make sure the current input window size is sane. Although there are
+ // provisions in the SPDY spec that allow the window size to be temporarily
+ // negative, or to go above its default initial size, with our current
+ // implementation that should never happen. Once we make the initial input
+ // window size configurable, we may need to adjust or remove these checks.
+ DCHECK_GE(input_window_size_, 0);
+ DCHECK_LE(input_window_size_, net::kSpdyStreamInitialWindowSize);
+
+ // Add the newly consumed data to the total. Assuming our caller is behaving
+ // well (even if the client isn't) -- that is, they are only consuming as
+ // much data as we have put into the input queue -- there should be no
+ // overflow here, and the new value should be at most the amount of
+ // un-WINDOW_UPDATE-ed data we've received. The reason we can be sure of
+ // this is that PostInputFrame() refuses to put more data into the queue than
+ // the window size allows, and aborts the stream if the client tries.
+ input_bytes_consumed_ += size;
+ DCHECK_GE(input_bytes_consumed_, size);
+ DCHECK_LE(input_bytes_consumed_,
+ static_cast<size_t>(net::kSpdyStreamInitialWindowSize -
+ input_window_size_));
+
+ // We don't want to send lots of little WINDOW_UPDATE frames (as that would
+ // waste bandwidth), so only bother sending one once it would have a
+ // reasonably large value.
+ // TODO(mdsteele): Consider also tracking whether we have received a FLAG_FIN
+ // on this stream; once we've gotten FLAG_FIN, there will be no more data,
+ // so we don't need to send any more WINDOW_UPDATE frames.
+ if (input_bytes_consumed_ < kMinWindowUpdateSize) {
+ return;
+ }
+
+ // The SPDY spec forbids sending WINDOW_UPDATE frames with a non-positive
+ // delta-window-size (SPDY draft 3 section 2.6.8). But since we already
+ // checked above that size was positive, input_bytes_consumed_ should now be
+ // positive as well.
+ DCHECK_GT(input_bytes_consumed_, 0u);
+ // Make sure there won't be any overflow shenanigans.
+ COMPILE_ASSERT(sizeof(size_t) >= sizeof(net::kSpdyMaximumWindowSize),
+ size_t_is_at_least_32_bits);
+ DCHECK_LE(input_bytes_consumed_,
+ static_cast<size_t>(net::kSpdyMaximumWindowSize));
+
+ // Send a WINDOW_UPDATE frame to the client and update our window size.
+ SendOutputFrame(new net::SpdyWindowUpdateIR(
+ stream_id_, input_bytes_consumed_));
+ input_window_size_ += input_bytes_consumed_;
+ DCHECK_LE(input_window_size_, net::kSpdyStreamInitialWindowSize);
+ input_bytes_consumed_ = 0;
+}
+
+void SpdyStream::AdjustOutputWindowSize(int32 delta) {
+ base::AutoLock autolock(lock_);
+
+ // Flow control only exists for SPDY v3 and up.
+ DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+
+ if (aborted_) {
+ return;
+ }
+
+ // Check for overflow; if it happens, abort the stream (which will wake up
+ // any blocked threads). Note that although delta is usually positive, it
+ // can also be negative, so we check for both overflow and underflow.
+ const int64 new_size =
+ static_cast<int64>(output_window_size_) + static_cast<int64>(delta);
+ if (new_size > static_cast<int64>(net::kSpdyMaximumWindowSize) ||
+ new_size < -static_cast<int64>(net::kSpdyMaximumWindowSize)) {
+ LOG(WARNING) << "Flow control overflow/underflow on stream "
+ << stream_id_ << ". Aborting stream.";
+ InternalAbortWithRstStream(net::RST_STREAM_FLOW_CONTROL_ERROR);
+ return;
+ }
+
+ // Update the window size.
+ const int32 old_size = output_window_size_;
+ output_window_size_ = static_cast<int32>(new_size);
+
+ // If the window size is newly positive, wake up any blocked threads.
+ if (old_size <= 0 && output_window_size_ > 0) {
+ condvar_.Broadcast();
+ }
+}
+
+void SpdyStream::PostInputFrame(net::SpdyFrameIR* frame_ptr) {
+ base::AutoLock autolock(lock_);
+
+ // Take ownership of the frame, so it will get deleted if we return early.
+ scoped_ptr<net::SpdyFrameIR> frame(frame_ptr);
+
+ // Once a stream has been aborted, nothing more goes into the queue.
+ if (aborted_) {
+ return;
+ }
+
+ // If this is a nonempty data frame (and we're using SPDY v3 or above) we
+ // need to track flow control.
+ if (spdy_version() >= spdy::SPDY_VERSION_3) {
+ DCHECK_GE(input_window_size_, 0);
+ const int size = DataFrameLength(*frame); // returns zero for ctrl frames
+ if (size > 0) {
+ // If receiving this much data would overflow the window size, then abort
+ // the stream with a flow control error.
+ if (size > input_window_size_) {
+ LOG(WARNING) << "Client violated flow control by sending too much data "
+ << "to stream " << stream_id_ << ". Aborting stream.";
+ InternalAbortWithRstStream(net::RST_STREAM_FLOW_CONTROL_ERROR);
+ return; // Quit without posting the frame to the queue.
+ }
+ // Otherwise, decrease the window size. It will be increased again once
+ // the data has been comsumed (by OnInputDataConsumed()).
+ else {
+ input_window_size_ -= size;
+ }
+ }
+ }
+
+ // Now that we've decreased the window size as necessary, we can make the
+ // frame available for consumption by the stream thread.
+ input_queue_.Insert(frame.release());
+}
+
+bool SpdyStream::GetInputFrame(bool block, net::SpdyFrameIR** frame) {
+ return input_queue_.Pop(block, frame);
+}
+
+void SpdyStream::SendOutputSynStream(const net::SpdyHeaderBlock& headers,
+ bool flag_fin) {
+ DCHECK(is_server_push());
+ base::AutoLock autolock(lock_);
+ if (aborted_) {
+ return;
+ }
+
+ scoped_ptr<net::SpdySynStreamIR> frame(new net::SpdySynStreamIR(stream_id_));
+ frame->set_associated_to_stream_id(associated_stream_id_);
+ frame->set_priority(priority_);
+ frame->set_fin(flag_fin);
+ frame->set_unidirectional(true);
+ frame->GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+ output_queue_->Insert(SpdyFramePriorityQueue::kTopPriority, frame.release());
+}
+
+void SpdyStream::SendOutputSynReply(const net::SpdyHeaderBlock& headers,
+ bool flag_fin) {
+ DCHECK(!is_server_push());
+ base::AutoLock autolock(lock_);
+ if (aborted_) {
+ return;
+ }
+
+ scoped_ptr<net::SpdySynReplyIR> frame(new net::SpdySynReplyIR(stream_id_));
+ frame->set_fin(flag_fin);
+ frame->GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+ SendOutputFrame(frame.release());
+}
+
+void SpdyStream::SendOutputHeaders(const net::SpdyHeaderBlock& headers,
+ bool flag_fin) {
+ base::AutoLock autolock(lock_);
+ if (aborted_) {
+ return;
+ }
+
+ scoped_ptr<net::SpdyHeadersIR> frame(new net::SpdyHeadersIR(stream_id_));
+ frame->set_fin(flag_fin);
+ frame->GetMutableNameValueBlock()->insert(headers.begin(), headers.end());
+ SendOutputFrame(frame.release());
+}
+
+void SpdyStream::SendOutputDataFrame(base::StringPiece data, bool flag_fin) {
+ base::AutoLock autolock(lock_);
+ if (aborted_) {
+ return;
+ }
+
+ // Flow control only exists for SPDY v3 and up; for SPDY v2, we can just send
+ // the data without regard to the window size. Even with flow control, we
+ // can of course send empty DATA frames at will.
+ if (spdy_version() < spdy::SPDY_VERSION_3 || data.empty()) {
+ // Suppress empty DATA frames (unless we're setting FLAG_FIN).
+ if (!data.empty() || flag_fin) {
+ scoped_ptr<net::SpdyDataIR> frame(new net::SpdyDataIR(stream_id_, data));
+ frame->set_fin(flag_fin);
+ SendOutputFrame(frame.release());
+ }
+ return;
+ }
+
+ while (!data.empty()) {
+ // If the current window size is non-positive, we must wait to send data
+ // until the client increases it (or we abort). Note that the window size
+ // can be negative if the client decreased the maximum window size (with a
+ // SETTINGS frame) after we already sent data (SPDY draft 3 section 2.6.8).
+ while (!aborted_ && output_window_size_ <= 0) {
+ condvar_.Wait();
+ }
+ if (aborted_) {
+ return;
+ }
+ // If the current window size is less than the amount of data we'd like to
+ // send, send a smaller data frame with the first part of the data, and
+ // then we'll sleep until the window size is increased before sending the
+ // rest.
+ DCHECK_LE(data.size(), static_cast<size_t>(kint32max));
+ const int32 full_length = data.size();
+ DCHECK_GT(output_window_size_, 0);
+ const int32 length_desired = std::min(full_length, output_window_size_);
+ output_window_size_ -= length_desired;
+ DCHECK_GE(output_window_size_, 0);
+ // Now we need to request quota from the session-shared flow control
+ // window. Since the call to RequestQuota may block, we need to unlock
+ // first.
+ int32 length_acquired;
+ if (spdy_version() >= spdy::SPDY_VERSION_3_1) {
+ base::AutoUnlock autounlock(lock_);
+ DCHECK(shared_window_);
+ length_acquired = shared_window_->RequestOutputQuota(length_desired);
+ } else {
+ // For SPDY versions that don't have a session window, just act like we
+ // got the quota we wanted.
+ length_acquired = length_desired;
+ }
+ // RequestQuota will return zero if the shared window has been aborted
+ // (i.e. if the session has been aborted). So in that case let's just
+ // abort too.
+ if (length_acquired <= 0) {
+ InternalAbortSilently();
+ return;
+ }
+ // If we didn't acquire as much as we wanted from the shared window, put
+ // the amount we're not actually using back into output_window_size_.
+ else if (length_acquired < length_desired) {
+ output_window_size_ += length_desired - length_acquired;
+ }
+ // Actually send the frame.
+ scoped_ptr<net::SpdyDataIR> frame(
+ new net::SpdyDataIR(stream_id_, data.substr(0, length_acquired)));
+ frame->set_fin(flag_fin && length_acquired == full_length);
+ SendOutputFrame(frame.release());
+ data = data.substr(length_acquired);
+ }
+}
+
+SpdyServerPushInterface::PushStatus SpdyStream::StartServerPush(
+ net::SpdyPriority priority,
+ const net::SpdyHeaderBlock& request_headers) {
+ DCHECK_GE(spdy_version(), spdy::SPDY_VERSION_3);
+ return pusher_->StartServerPush(stream_id_, server_push_depth_ + 1, priority,
+ request_headers);
+}
+
+void SpdyStream::SendOutputFrame(net::SpdyFrameIR* frame) {
+ lock_.AssertAcquired();
+ DCHECK(!aborted_);
+ output_queue_->Insert(static_cast<int>(priority_), frame);
+}
+
+void SpdyStream::InternalAbortSilently() {
+ lock_.AssertAcquired();
+ input_queue_.Abort();
+ aborted_ = true;
+ condvar_.Broadcast();
+}
+
+void SpdyStream::InternalAbortWithRstStream(net::SpdyRstStreamStatus status) {
+ lock_.AssertAcquired();
+ output_queue_->Insert(SpdyFramePriorityQueue::kTopPriority,
+ new net::SpdyRstStreamIR(stream_id_, status));
+ // InternalAbortSilently will set aborted_ to true, which will prevent the
+ // stream thread from sending any more frames on this stream after the
+ // RST_STREAM.
+ InternalAbortSilently();
+}
+
+} // namespace mod_spdy