You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2019/12/18 22:34:39 UTC

[mesos] 10/11: SSL Socket: Implemented sendfile.

This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f8f6720591931ee6a71e4aeffd50192f6fc96df1
Author: Joseph Wu <jo...@apache.org>
AuthorDate: Tue Nov 19 15:01:44 2019 -0800

    SSL Socket: Implemented sendfile.
    
    This implements the SSL socket's sendfile method, which must read
    the file (unlike the zero-copy os::sendfile).
    
    This also moves a test exercising sendfile from process_tests.cpp
    into http_tests.cpp and parameterizes it for SSL and non-SSL.
    
    Review: https://reviews.apache.org/r/71790
---
 3rdparty/libprocess/src/ssl/openssl_socket.cpp  | 50 ++++++++++++++++-
 3rdparty/libprocess/src/tests/http_tests.cpp    | 71 +++++++++++++++++++++++++
 3rdparty/libprocess/src/tests/process_tests.cpp | 45 ----------------
 3 files changed, 120 insertions(+), 46 deletions(-)

diff --git a/3rdparty/libprocess/src/ssl/openssl_socket.cpp b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
index 42a1918..74f9fe2 100644
--- a/3rdparty/libprocess/src/ssl/openssl_socket.cpp
+++ b/3rdparty/libprocess/src/ssl/openssl_socket.cpp
@@ -20,9 +20,12 @@
 #include <openssl/ssl.h>
 #include <openssl/err.h>
 
+#include <algorithm>
 #include <atomic>
 #include <queue>
 
+#include <boost/shared_array.hpp>
+
 #include <process/io.hpp>
 #include <process/loop.hpp>
 #include <process/owned.hpp>
@@ -35,6 +38,8 @@
 #include <stout/unimplemented.hpp>
 #include <stout/unreachable.hpp>
 
+#include <stout/os/lseek.hpp>
+
 #include "openssl.hpp"
 
 #include "ssl/openssl_socket.hpp"
@@ -490,7 +495,50 @@ Future<size_t> OpenSSLSocketImpl::send(const char* input, size_t size)
 Future<size_t> OpenSSLSocketImpl::sendfile(
     int_fd fd, off_t offset, size_t size)
 {
-  UNIMPLEMENTED;
+  if (dirty_shutdown) {
+    return Failure("Socket is shutdown");
+  }
+
+  // Hold a weak pointer since both read and write are not guaranteed to finish.
+  std::weak_ptr<OpenSSLSocketImpl> weak_self(shared(this));
+
+  Try<off_t> seek = os::lseek(fd, offset, SEEK_SET);
+  if (seek.isError()) {
+    return Failure("Failed to seek: " + seek.error());
+  }
+
+  Try<Nothing> async = io::prepare_async(fd);
+  if (async.isError()) {
+    return Failure("Failed to make FD asynchronous: " + async.error());
+  }
+
+  size_t remaining_size = size;
+  boost::shared_array<char> data(new char[io::BUFFERED_READ_SIZE]);
+
+  return process::loop(
+      compute_thread,
+      [weak_self, fd, remaining_size, data]() -> Future<size_t> {
+        return io::read(
+            fd, data.get(), std::min(io::BUFFERED_READ_SIZE, remaining_size))
+          .then([weak_self, data](size_t read_bytes) -> Future<size_t> {
+            std::shared_ptr<OpenSSLSocketImpl> self(weak_self.lock());
+            if (self == nullptr) {
+              return Failure("Socket destroyed while sending file");
+            }
+
+            return self->send(data.get(), read_bytes);
+          });
+      },
+      [size, &remaining_size](size_t written) mutable
+          -> Future<ControlFlow<size_t>> {
+        remaining_size -= written;
+
+        if (remaining_size > 0) {
+          return Continue();
+        }
+
+        return Break(size);
+      });
 }
 
 
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index 1433f3d..b906b3c 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -51,6 +51,8 @@
 #include <stout/os.hpp>
 #include <stout/stringify.hpp>
 
+#include <stout/os/write.hpp>
+
 #include <stout/tests/utils.hpp>
 
 #include "encoder.hpp"
@@ -887,6 +889,75 @@ TEST_P(HTTPTest, StreamingGetFailure)
 }
 
 
+class FileServerProcess : public Process<FileServerProcess>
+{
+public:
+  explicit FileServerProcess(const string& _path)
+    : path(_path) {}
+
+protected:
+  void initialize() override
+  {
+    provide("", path);
+  }
+
+  const string path;
+};
+
+
+class FileServer
+{
+public:
+  FileServer(const string& path) : process(new FileServerProcess(path))
+  {
+    spawn(process.get());
+  }
+
+  ~FileServer()
+  {
+    terminate(process.get());
+    wait(process.get());
+  }
+
+  Owned<FileServerProcess> process;
+};
+
+
+TEST_P(HTTPTest, ProvideSendfile)
+{
+  // A file smaller than the buffered read size.
+  const string LOREM_IPSUM =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad "
+    "minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip "
+    "ex ea commodo consequat. Duis aute irure dolor in reprehenderit in "
+    "voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur "
+    "sint occaecat cupidatat non proident, sunt in culpa qui officia "
+    "deserunt mollit anim id est laborum.";
+
+  const string path = path::join(sandbox.get(), "lorem.txt");
+  ASSERT_SOME(os::write(path, LOREM_IPSUM));
+
+  FileServer server(path);
+
+  Future<http::Response> response =
+    http::get(server.process->self(), None(), None(), None(), GetParam());
+
+  AWAIT_READY(response);
+  ASSERT_EQ(LOREM_IPSUM, response->body);
+
+  // A file significantly larger than the buffered read size.
+  const string LOREM_IPSUM_AND_JUNK = LOREM_IPSUM + string(1024 * 1024, 'A');
+  ASSERT_SOME(os::write(path, LOREM_IPSUM_AND_JUNK));
+
+  response =
+    http::get(server.process->self(), None(), None(), None(), GetParam());
+
+  AWAIT_READY(response);
+  ASSERT_EQ(LOREM_IPSUM_AND_JUNK, response->body);
+}
+
+
 TEST_P(HTTPTest, PipeEquality)
 {
   // Pipes are shared objects, like Futures. Copies are considered
diff --git a/3rdparty/libprocess/src/tests/process_tests.cpp b/3rdparty/libprocess/src/tests/process_tests.cpp
index 05dc5ec..42295a6 100644
--- a/3rdparty/libprocess/src/tests/process_tests.cpp
+++ b/3rdparty/libprocess/src/tests/process_tests.cpp
@@ -1657,51 +1657,6 @@ TEST_F(ProcessTest, Async)
 }
 
 
-class FileServer : public Process<FileServer>
-{
-public:
-  explicit FileServer(const string& _path)
-    : path(_path) {}
-
-  void initialize() override
-  {
-    provide("", path);
-  }
-
-  const string path;
-};
-
-
-TEST_F(ProcessTest, Provide)
-{
-  const string LOREM_IPSUM =
-      "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
-      "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad "
-      "minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip "
-      "ex ea commodo consequat. Duis aute irure dolor in reprehenderit in "
-      "voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur "
-      "sint occaecat cupidatat non proident, sunt in culpa qui officia "
-      "deserunt mollit anim id est laborum.";
-
-  const string path = path::join(sandbox.get(), "lorem.txt");
-  ASSERT_SOME(os::write(path, LOREM_IPSUM));
-
-  FileServer server(path);
-  PID<FileServer> pid = spawn(server);
-
-  Future<http::Response> response = http::get(pid);
-
-  AWAIT_READY(response);
-
-  ASSERT_EQ(LOREM_IPSUM, response->body);
-
-  terminate(server);
-  wait(server);
-
-  ASSERT_SOME(os::rmdir(path));
-}
-
-
 static int baz(string s) { return 42; }