You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/12/18 22:34:39 UTC
[mesos] 10/11: SSL Socket: Implemented sendfile.
This is an automated email from the ASF dual-hosted git repository.
josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit f8f6720591931ee6a71e4aeffd50192f6fc96df1
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Tue Nov 19 15:01:44 2019 -0800
SSL Socket: Implemented sendfile.
This implements the SSL socket's sendfile method, which must read
the file (unlike the zero-copy os::sendfile).
This also moves a test exercising sendfile from process_tests.cpp
into http_tests.cpp and parameterizes it for SSL and non-SSL.
Review: https://reviews.apache.org/r/71790
---
3rdparty/libprocess/src/ssl/openssl_socket.cpp | 50 ++++++++++++++++-
3rdparty/libprocess/src/tests/http_tests.cpp | 71 +++++++++++++++++++++++++
3rdparty/libprocess/src/tests/process_tests.cpp | 45 ----------------
3 files changed, 120 insertions(+), 46 deletions(-)
diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.cpp b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
index 42a1918..74f9fe2 100644
--- a/3rdparty/libprocess/src/ssl/openssl_socket.cpp
+++ b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
@@ -20,9 +20,12 @@
#include <openssl/ssl.h>
#include <openssl/err.h>
+#include <algorithm>
#include <atomic>
#include <queue>
+#include <boost/shared_array.hpp>
+
#include <process/io.hpp>
#include <process/loop.hpp>
#include <process/owned.hpp>
@@ -35,6 +38,8 @@
#include <stout/unimplemented.hpp>
#include <stout/unreachable.hpp>
+#include <stout/os/lseek.hpp>
+
#include "openssl.hpp"
#include "ssl/openssl_socket.hpp"
@@ -490,7 +495,50 @@ Future<size_t> OpenSSLSocketImpl::send(const char* input, size_t size)
Future<size_t> OpenSSLSocketImpl::sendfile(
int_fd fd, off_t offset, size_t size)
{
- UNIMPLEMENTED;
+ if (dirty_shutdown) {
+ return Failure("Socket is shutdown");
+ }
+
+ // Hold a weak pointer since both read and write are not guaranteed to finish.
+ std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+ Try<off_t> seek = os::lseek(fd, offset, SEEK_SET);
+ if (seek.isError()) {
+ return Failure("Failed to seek: " + seek.error());
+ }
+
+ Try<Nothing> async = io::prepare_async(fd);
+ if (async.isError()) {
+ return Failure("Failed to make FD asynchronous: " + async.error());
+ }
+
+ size_t remaining_size = size;
+ boost::shared_array<char> data(new char[io::BUFFERED_READ_SIZE]);
+
+ return process::loop(
+ compute_thread,
+ [weak_self, fd, remaining_size, data]() -> Future<size_t> {
+ return io::read(
+ fd, data.get(), std::min(io::BUFFERED_READ_SIZE, remaining_size))
+ .then([weak_self, data](size_t read_bytes) -> Future<size_t> {
+ std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+ if (self == nullptr) {
+ return Failure("Socket destroyed while sending file");
+ }
+
+ return self->send(data.get(), read_bytes);
+ });
+ },
+ [size, &remaining_size](size_t written) mutable
+ -> Future<ControlFlow<size_t>> {
+ remaining_size -= written;
+
+ if (remaining_size > 0) {
+ return Continue();
+ }
+
+ return Break(size);
+ });
}
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 1433f3d..b906b3c 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -51,6 +51,8 @@
#include <stout/os.hpp>
#include <stout/stringify.hpp>
+#include <stout/os/write.hpp>
+
#include <stout/tests/utils.hpp>
#include "encoder.hpp"
@@ -887,6 +889,75 @@ TEST_P(HTTPTest, StreamingGetFailure)
}
+class FileServerProcess : public Process<FileServerProcess>
+{
+public:
+ explicit FileServerProcess(const string& _path)
+ : path(_path) {}
+
+protected:
+ void initialize() override
+ {
+ provide("", path);
+ }
+
+ const string path;
+};
+
+
+class FileServer
+{
+public:
+ FileServer(const string& path) : process(new FileServerProcess(path))
+ {
+ spawn(process.get());
+ }
+
+ ~FileServer()
+ {
+ terminate(process.get());
+ wait(process.get());
+ }
+
+ Owned<FileServerProcess> process;
+};
+
+
+TEST_P(HTTPTest, ProvideSendfile)
+{
+ // A file smaller than the buffered read size.
+ const string LOREM_IPSUM =
+ "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+ "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad "
+ "minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip "
+ "ex ea commodo consequat. Duis aute irure dolor in reprehenderit in "
+ "voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur "
+ "sint occaecat cupidatat non proident, sunt in culpa qui officia "
+ "deserunt mollit anim id est laborum.";
+
+ const string path = path::join(sandbox.get(), "lorem.txt");
+ ASSERT_SOME(os::write(path, LOREM_IPSUM));
+
+ FileServer server(path);
+
+ Future<http::Response> response =
+ http::get(server.process->self(), None(), None(), None(), GetParam());
+
+ AWAIT_READY(response);
+ ASSERT_EQ(LOREM_IPSUM, response->body);
+
+ // A file significantly larger than the buffered read size.
+ const string LOREM_IPSUM_AND_JUNK = LOREM_IPSUM + string(1024 * 1024, 'A');
+ ASSERT_SOME(os::write(path, LOREM_IPSUM_AND_JUNK));
+
+ response =
+ http::get(server.process->self(), None(), None(), None(), GetParam());
+
+ AWAIT_READY(response);
+ ASSERT_EQ(LOREM_IPSUM_AND_JUNK, response->body);
+}
+
+
TEST_P(HTTPTest, PipeEquality)
{
// Pipes are shared objects, like Futures. Copies are considered
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 05dc5ec..42295a6 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1657,51 +1657,6 @@ TEST_F(ProcessTest, Async)
}
-class FileServer : public Process<FileServer>
-{
-public:
- explicit FileServer(const string& _path)
- : path(_path) {}
-
- void initialize() override
- {
- provide("", path);
- }
-
- const string path;
-};
-
-
-TEST_F(ProcessTest, Provide)
-{
- const string LOREM_IPSUM =
- "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
- "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad "
- "minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip "
- "ex ea commodo consequat. Duis aute irure dolor in reprehenderit in "
- "voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur "
- "sint occaecat cupidatat non proident, sunt in culpa qui officia "
- "deserunt mollit anim id est laborum.";
-
- const string path = path::join(sandbox.get(), "lorem.txt");
- ASSERT_SOME(os::write(path, LOREM_IPSUM));
-
- FileServer server(path);
- PID<FileServer> pid = spawn(server);
-
- Future<http::Response> response = http::get(pid);
-
- AWAIT_READY(response);
-
- ASSERT_EQ(LOREM_IPSUM, response->body);
-
- terminate(server);
- wait(server);
-
- ASSERT_SOME(os::rmdir(path));
-}
-
-
static int baz(string s) { return 42; }