You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/10/17 01:05:16 UTC

svn commit: r1399035 - in /incubator/mesos/trunk: src/files/ src/tests/ src/webui/master/static/ third_party/libprocess/include/process/ third_party/libprocess/src/

Author: benh
Date: Tue Oct 16 23:05:15 2012
New Revision: 1399035

URL: http://svn.apache.org/viewvc?rev=1399035&view=rev
Log:
Changed FilesProcess to use async io::read and updated pailer to not
use redundant 'length' field of response.

From: Ben Mahler <be...@gmail.com>
Review: https://reviews.apache.org/r/7048

Modified:
    incubator/mesos/trunk/src/files/files.cpp
    incubator/mesos/trunk/src/tests/files_tests.cpp
    incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js
    incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp
    incubator/mesos/trunk/third_party/libprocess/src/process.cpp
    incubator/mesos/trunk/third_party/libprocess/src/tests.cpp

Modified: incubator/mesos/trunk/src/files/files.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/files/files.cpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/src/files/files.cpp (original)
+++ incubator/mesos/trunk/src/files/files.cpp Tue Oct 16 23:05:15 2012
@@ -1,12 +1,18 @@
+#include <unistd.h>
+
 #include <sys/stat.h>
 
+#include <algorithm>
 #include <map>
 #include <string>
 #include <vector>
 
+#include <boost/shared_array.hpp>
+
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
 #include <process/http.hpp>
+#include <process/io.hpp>
 #include <process/mime.hpp>
 #include <process/process.hpp>
 
@@ -56,12 +62,6 @@ protected:
   virtual void initialize();
 
 private:
-  // HTTP endpoints.
-  Future<Response> browse(const Request& request);
-  Future<Response> read(const Request& request);
-  Future<Response> download(const Request& request);
-  Future<Response> debug(const Request& request);
-
   // Resolves the virtual path to an actual path.
   // Returns the actual path if found.
   // Returns None if the file is not found.
@@ -69,6 +69,27 @@ private:
   // out of the chroot.
   Result<std::string> resolve(const string& path);
 
+  // HTTP endpoints.
+
+  // Returns a file listing for a directory.
+  // Requests have the following parameters:
+  //   path: The directory to browse. Required.
+  // The response will contain a list of JSON files and directories contained
+  // in the path (see files::jsonFileInfo for the format).
+  Future<Response> browse(const Request& request);
+
+  // Reads data from a file at a given offset and for a given length.
+  // See the jquery pailer for the expected behavior.
+  Future<Response> read(const Request& request);
+
+  // Returns the raw file contents for a given path.
+  // Requests have the following parameters:
+  //   path: The directory to browse. Required.
+  Future<Response> download(const Request& request);
+
+  // Returns the internal virtual path mapping.
+  Future<Response> debug(const Request& request);
+
   hashmap<string, string> paths;
 };
 
@@ -163,6 +184,23 @@ Future<Response> FilesProcess::browse(co
 }
 
 
+// TODO(benh): Remove 'const &' from size after fixing libprocess.
+Future<Response> _read(int fd,
+                       const size_t& size,
+                       off_t offset,
+                       const boost::shared_array<char>& data,
+                       const Option<string>& jsonp) {
+  JSON::Object object;
+
+  object.values["offset"] = offset;
+  object.values["data"] = string(data.get(), size);
+
+  os::close(fd);
+
+  return OK(object, jsonp);
+}
+
+
 Future<Response> FilesProcess::read(const Request& request)
 {
   Option<string> path = request.query.get("path");
@@ -221,7 +259,7 @@ Future<Response> FilesProcess::read(cons
     string error = strings::format("Failed to open file at '%s': %s",
         resolvedPath.get(), strerror(errno)).get();
     LOG(WARNING) << error;
-    close(fd.get());
+    os::close(fd.get());
     return InternalServerError(error + ".\n");
   }
 
@@ -233,49 +271,47 @@ Future<Response> FilesProcess::read(cons
     length = size - offset;
   }
 
-  JSON::Object object;
+  // Cap the read length at 16 pages.
+  length = std::min(length, sysconf(_SC_PAGE_SIZE) * 16);
 
-  if (offset < size) {
-    // Seek to the offset we want to read from.
-    if (lseek(fd.get(), offset, SEEK_SET) == -1) {
-      string error = strings::format("Failed to seek file at '%s': %s",
-          resolvedPath.get(), strerror(errno)).get();
-      LOG(WARNING) << error;
-      close(fd.get());
-      return InternalServerError(error + ".\n");
-    }
+  if (offset >= size) {
+    os::close(fd.get());
 
-    // Read length bytes (or to EOF).
-    char* temp = new char[length];
+    JSON::Object object;
+    object.values["offset"] = size;
+    object.values["data"] = "";
+    return OK(object, request.query.get("jsonp"));
+  }
 
-    // TODO(bmahler): Change this to use async process::read.
-    length = ::read(fd.get(), temp, length);
+  // Seek to the offset we want to read from.
+  if (lseek(fd.get(), offset, SEEK_SET) == -1) {
+    string error = strings::format("Failed to seek file at '%s': %s",
+        resolvedPath.get(), strerror(errno)).get();
+    LOG(WARNING) << error;
+    os::close(fd.get());
+    return InternalServerError(error);
+  }
 
-    if (length == 0) {
-      object.values["offset"] = offset;
-      object.values["length"] = 0;
-      delete[] temp;
-    } else if (length == -1) {
-      string error = strings::format("Failed to read file at '%s': %s",
-          resolvedPath.get(), strerror(errno)).get();
-      LOG(WARNING) << error;
-      delete[] temp;
-      close(fd.get());
-      return InternalServerError(error + ".\n");
-    } else {
-      object.values["offset"] = offset;
-      object.values["length"] = length;
-      object.values["data"] = string(temp, length);
-      delete[] temp;
-    }
-  } else {
-    object.values["offset"] = size;
-    object.values["length"] = 0;
+  Try<Nothing> nonblock = os::nonblock(fd.get());
+  if (nonblock.isError()) {
+    string error =
+        "Failed to set file descriptor nonblocking: " + nonblock.error();
+    LOG(WARNING) << error;
+    return InternalServerError(error);
   }
 
-  close(fd.get());
+  // Read 'length' bytes (or to EOF).
+  boost::shared_array<char> data(new char[length]);
 
-  return OK(object, request.query.get("jsonp"));
+  // TODO(bmahler): C++11 version when ready.
+  std::tr1::function<Future<Response>(const size_t&)> f =
+      std::tr1::bind(_read,
+                     fd.get(),
+                     std::tr1::placeholders::_1,
+                     offset,
+                     data,
+                     request.query.get("jsonp"));
+  return io::read(fd.get(), data.get(), static_cast<size_t>(length)).then(f);
 }
 
 

Modified: incubator/mesos/trunk/src/tests/files_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/files_tests.cpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/files_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/files_tests.cpp Tue Oct 16 23:05:15 2012
@@ -108,7 +108,6 @@ TEST_WITH_WORKDIR(FilesTest, ReadTest)
   // Read a valid file.
   JSON::Object expected;
   expected.values["offset"] = 0;
-  expected.values["length"] = strlen("body");
   expected.values["data"] = "body";
 
   response = process::http::get(pid, "read.json", "path=/myname&offset=0");
@@ -147,7 +146,6 @@ TEST_WITH_WORKDIR(FilesTest, ResolveTest
   // Resolve 1/2/3 via each attached path.
   JSON::Object expected;
   expected.values["offset"] = 0;
-  expected.values["length"] = strlen("three");
   expected.values["data"] = "three";
 
   Future<Response> response =

Modified: incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js (original)
+++ incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js Tue Oct 16 23:05:15 2012
@@ -7,12 +7,12 @@
 // fields 'offset' and 'length' set for reading the data. The result
 // from of the function should be a "promise" like value which has a
 // 'success' and 'error' callback which each take a function. An
-// object with at least two fields defined ('offset' and 'length') is
-// expected on success. A third field, 'data' may be provided if
-// 'length' is greater than 0. If the offset requested is greater than
-// the available offset, the result should be an object with the
-// 'offset' field set to the available offset (i.e., the length of the
-// data) and the 'length' field set to 0.
+// object with at least two fields defined ('offset' and 'data') is
+// expected on success. The length of 'data' may be smaller than the
+// amount requested. If the offset requested is greater than the
+// available offset, the result should be an object with the 'offset'
+// field set to the available offset (i.e., the total length of the
+// data) with an empty 'data' field.
 
 // The plugin prepends, appends, and updates the "html" component of
 // the elements specified in the jQuery selector (e.g., doing
@@ -157,24 +157,22 @@
     var read = function(offset, length) {
       this_.read({'offset': offset, 'length': length})
         .success(function(data) {
-          if (data.length < length) {
+          if (data.data.length < length) {
               buffer += data.data;
-              read(offset + data.length, length - data.length);
-          } else if (data.length > 0) {
+              read(offset + data.data.length, length - data.data.length);
+          } else if (data.data.length > 0) {
             this_.indicate('(PAGED)');
             setTimeout(function() { this_.indicate(''); }, 1000);
 
             // Prepend buffer onto data.
             data.offset -= buffer.length;
             data.data = buffer + data.data;
-            data.length = data.data.length;
 
             // Truncate to the first newline (unless this is the beginning).
             if (data.offset != 0) {
               var index = data.data.indexOf('\n') + 1;
               data.offset += index;
               data.data = data.data.substring(index);
-              data.length -= index;
             }
 
             this_.start = data.offset;
@@ -223,18 +221,17 @@
           return;
         }
 
-        if (data.length > 0) {
+        if (data.data.length > 0) {
           // Truncate to the first newline if this is the first time
           // (and we aren't reading from the beginning of the log).
           if (this_.start == this_.end && data.offset != 0) {
             var index = data.data.indexOf('\n') + 1;
             data.offset += index;
             data.data = data.data.substring(index);
-            data.length -= index;
             this_.start = data.offset; // Adjust the actual start too!
           }
 
-          this_.end = data.offset + data.length;
+          this_.end = data.offset + data.data.length;
 
           this_.element.append(data.data);
 
@@ -254,7 +251,7 @@
         // log data at a time ... the right solution here might be to do
         // a request to determine the new ending offset and then request
         // the proper length.
-        if (data.length == this_.truncate_length) {
+        if (data.data.length == this_.truncate_length) {
           setTimeout(function() { this_.tail(); }, 0);
         } else {
           setTimeout(function() { this_.tail(); }, 1000);

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp Tue Oct 16 23:05:15 2012
@@ -1,6 +1,7 @@
 #ifndef __PROCESS_IO_HPP__
 #define __PROCESS_IO_HPP__
 
+#include <cstring> // For size_t.
 #include <string>
 
 #include <process/future.hpp>
@@ -12,11 +13,14 @@ namespace io {
 const short READ = 0x01;
 const short WRITE = 0x02;
 
+// Buffered read chunk size. Roughly 16 pages.
+const size_t BUFFERED_READ_SIZE = 16*4096;
+
+// TODO(benh): Add a version which takes multiple file descriptors.
 // Returns the events (a subset of the events specified) that can be
 // performed on the specified file descriptor without blocking.
 Future<short> poll(int fd, short events);
 
-// TODO(benh): Add a version which takes multiple file descriptors.
 
 // Performs a single non-blocking read by polling on the specified file
 // descriptor until any data can be be read. The future will become ready when
@@ -28,6 +32,7 @@ Future<short> poll(int fd, short events)
 // future, thus only a 'size_t' is necessary rather than a 'ssize_t').
 Future<size_t> read(int fd, void* data, size_t size);
 
+
 // Performs a series of asynchronous reads, until EOF is reached.
 // NOTE: When using this, ensure the sender will close the connection
 // so that EOF can be reached.

Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Tue Oct 16 23:05:15 2012
@@ -43,6 +43,8 @@
 #include <tr1/functional>
 #include <tr1/memory> // TODO(benh): Replace all shared_ptr with unique_ptr.
 
+#include <boost/shared_array.hpp>
+
 #include <process/clock.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -3206,14 +3208,15 @@ Future<size_t> read(int fd, void* data, 
 namespace internal {
 
 #if __cplusplus >= 201103L
-Future<string> _read(int fd, string* buffer, char* data, size_t length)
+Future<string> _read(int fd,
+                     const std::tr1::shared_ptr<string>& buffer,
+                     const boost::shared_array<char>& data,
+                     size_t length)
 {
-  return io::read(fd, data, length)
+  return io::read(fd, data.get(), length)
     .then([=] (size_t size) {
       if (size == 0) { // EOF.
         string result(*buffer);
-        delete buffer;
-        delete[] data;
         return Future<string>(result);
       }
       buffer->append(data, size);
@@ -3222,51 +3225,55 @@ Future<string> _read(int fd, string* buf
 }
 #else
 // Forward declataion.
-Future<string> _read(int fd, string* buffer, char* data, size_t length);
+Future<string> _read(int fd,
+                     const std::tr1::shared_ptr<string>& buffer,
+                     const boost::shared_array<char>& data,
+                     size_t length);
 
 
 Future<string> __read(
     const size_t& size,
     // TODO(benh): Remove 'const &' after fixing libprocess.
     int fd,
-    string* buffer,
-    char* data,
+    const std::tr1::shared_ptr<string>& buffer,
+    const boost::shared_array<char>& data,
     size_t length)
 {
   if (size == 0) { // EOF.
     string result(*buffer);
-    delete buffer;
-    delete[] data;
     return Future<string>(result);
   }
 
-  buffer->append(data, size);
+  buffer->append(data.get(), size);
   return _read(fd, buffer, data, length);
 }
 
 
-Future<string> _read(int fd, string* buffer, char* data, size_t length)
+Future<string> _read(int fd,
+                     const std::tr1::shared_ptr<string>& buffer,
+                     const boost::shared_array<char>& data,
+                     size_t length)
 {
   std::tr1::function<Future<string>(const size_t&)> f =
     std::tr1::bind(__read, lambda::_1, fd, buffer, data, length);
 
-  return io::read(fd, data, length).then(f);
+  return io::read(fd, data.get(), length).then(f);
 }
 #endif
 
 } // namespace internal
 
+
 Future<string> read(int fd)
 {
   process::initialize();
 
   // TODO(benh): Wrap up this data as a struct, use 'Owner'.
   // TODO(bmahler): For efficiency, use a rope for the buffer.
-  string* buffer = new string();
-  size_t length = 1024;
-  char* data = new char[length];
+  std::tr1::shared_ptr<string> buffer(new string());
+  boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
 
-  return internal::_read(fd, buffer, data, length);
+  return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE);
 }
 
 

Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Tue Oct 16 23:05:15 2012
@@ -1351,6 +1351,42 @@ TEST(HTTP, encode)
 }
 
 
+TEST(Process, BufferedRead)
+{
+  // 128 Bytes.
+  std::string data =
+      "This data is much larger than BUFFERED_READ_SIZE, which means it will "
+      "trigger multiple buffered async reads as a result.........";
+  CHECK(data.size() == 128);
+
+  // Keep doubling the data size until we're guaranteed to trigger at least
+  // 3 buffered async reads.
+  while (data.length() < 3 * io::BUFFERED_READ_SIZE) {
+    data.append(data);
+  }
+
+  ASSERT_TRUE(os::write("file", data).isSome());
+
+  Try<int> fd = os::open("file", O_RDONLY);
+  ASSERT_TRUE(fd.isSome());
+
+  // Read from blocking fd.
+  Future<std::string> future = io::read(fd.get());
+  ASSERT_TRUE(future.await(Seconds(5.0)));
+  EXPECT_TRUE(future.isFailed());
+
+  // Read from non-blocking fd.
+  ASSERT_TRUE(os::nonblock(fd.get()).isSome());
+
+  future = io::read(fd.get());
+  ASSERT_TRUE(future.await(Seconds(5.0)));
+  EXPECT_TRUE(future.isReady());
+  EXPECT_EQ(data, future.get());
+
+  os::close(fd.get());
+}
+
+
 int main(int argc, char** argv)
 {
   // Initialize Google Mock/Test.