You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/10/17 01:05:16 UTC
svn commit: r1399035 - in /incubator/mesos/trunk: src/files/ src/tests/
src/webui/master/static/ third_party/libprocess/include/process/
third_party/libprocess/src/
Author: benh
Date: Tue Oct 16 23:05:15 2012
New Revision: 1399035
URL: http://svn.apache.org/viewvc?rev=1399035&view=rev
Log:
Changed FilesProcess to use async io::read and updated pailer to not
use redundant 'length' field of response.
From: Ben Mahler <be...@gmail.com>
Review: https://reviews.apache.org/r/7048
Modified:
incubator/mesos/trunk/src/files/files.cpp
incubator/mesos/trunk/src/tests/files_tests.cpp
incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js
incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp
incubator/mesos/trunk/third_party/libprocess/src/process.cpp
incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
Modified: incubator/mesos/trunk/src/files/files.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/files/files.cpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/src/files/files.cpp (original)
+++ incubator/mesos/trunk/src/files/files.cpp Tue Oct 16 23:05:15 2012
@@ -1,12 +1,18 @@
+#include <unistd.h>
+
#include <sys/stat.h>
+#include <algorithm>
#include <map>
#include <string>
#include <vector>
+#include <boost/shared_array.hpp>
+
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/http.hpp>
+#include <process/io.hpp>
#include <process/mime.hpp>
#include <process/process.hpp>
@@ -56,12 +62,6 @@ protected:
virtual void initialize();
private:
- // HTTP endpoints.
- Future<Response> browse(const Request& request);
- Future<Response> read(const Request& request);
- Future<Response> download(const Request& request);
- Future<Response> debug(const Request& request);
-
// Resolves the virtual path to an actual path.
// Returns the actual path if found.
// Returns None if the file is not found.
@@ -69,6 +69,27 @@ private:
// out of the chroot.
Result<std::string> resolve(const string& path);
+ // HTTP endpoints.
+
+ // Returns a file listing for a directory.
+ // Requests have the following parameters:
+ // path: The directory to browse. Required.
+ // The response will contain a list of JSON files and directories contained
+ // in the path (see files::jsonFileInfo for the format).
+ Future<Response> browse(const Request& request);
+
+ // Reads data from a file at a given offset and for a given length.
+ // See the jquery pailer for the expected behavior.
+ Future<Response> read(const Request& request);
+
+ // Returns the raw file contents for a given path.
+ // Requests have the following parameters:
+ // path: The directory to browse. Required.
+ Future<Response> download(const Request& request);
+
+ // Returns the internal virtual path mapping.
+ Future<Response> debug(const Request& request);
+
hashmap<string, string> paths;
};
@@ -163,6 +184,23 @@ Future<Response> FilesProcess::browse(co
}
+// TODO(benh): Remove 'const &' from size after fixing libprocess.
+Future<Response> _read(int fd,
+ const size_t& size,
+ off_t offset,
+ const boost::shared_array<char>& data,
+ const Option<string>& jsonp) {
+ JSON::Object object;
+
+ object.values["offset"] = offset;
+ object.values["data"] = string(data.get(), size);
+
+ os::close(fd);
+
+ return OK(object, jsonp);
+}
+
+
Future<Response> FilesProcess::read(const Request& request)
{
Option<string> path = request.query.get("path");
@@ -221,7 +259,7 @@ Future<Response> FilesProcess::read(cons
string error = strings::format("Failed to open file at '%s': %s",
resolvedPath.get(), strerror(errno)).get();
LOG(WARNING) << error;
- close(fd.get());
+ os::close(fd.get());
return InternalServerError(error + ".\n");
}
@@ -233,49 +271,47 @@ Future<Response> FilesProcess::read(cons
length = size - offset;
}
- JSON::Object object;
+ // Cap the read length at 16 pages.
+ length = std::min(length, sysconf(_SC_PAGE_SIZE) * 16);
- if (offset < size) {
- // Seek to the offset we want to read from.
- if (lseek(fd.get(), offset, SEEK_SET) == -1) {
- string error = strings::format("Failed to seek file at '%s': %s",
- resolvedPath.get(), strerror(errno)).get();
- LOG(WARNING) << error;
- close(fd.get());
- return InternalServerError(error + ".\n");
- }
+ if (offset >= size) {
+ os::close(fd.get());
- // Read length bytes (or to EOF).
- char* temp = new char[length];
+ JSON::Object object;
+ object.values["offset"] = size;
+ object.values["data"] = "";
+ return OK(object, request.query.get("jsonp"));
+ }
- // TODO(bmahler): Change this to use async process::read.
- length = ::read(fd.get(), temp, length);
+ // Seek to the offset we want to read from.
+ if (lseek(fd.get(), offset, SEEK_SET) == -1) {
+ string error = strings::format("Failed to seek file at '%s': %s",
+ resolvedPath.get(), strerror(errno)).get();
+ LOG(WARNING) << error;
+ os::close(fd.get());
+ return InternalServerError(error);
+ }
- if (length == 0) {
- object.values["offset"] = offset;
- object.values["length"] = 0;
- delete[] temp;
- } else if (length == -1) {
- string error = strings::format("Failed to read file at '%s': %s",
- resolvedPath.get(), strerror(errno)).get();
- LOG(WARNING) << error;
- delete[] temp;
- close(fd.get());
- return InternalServerError(error + ".\n");
- } else {
- object.values["offset"] = offset;
- object.values["length"] = length;
- object.values["data"] = string(temp, length);
- delete[] temp;
- }
- } else {
- object.values["offset"] = size;
- object.values["length"] = 0;
+ Try<Nothing> nonblock = os::nonblock(fd.get());
+ if (nonblock.isError()) {
+ string error =
+ "Failed to set file descriptor nonblocking: " + nonblock.error();
+ LOG(WARNING) << error;
+ return InternalServerError(error);
}
- close(fd.get());
+ // Read 'length' bytes (or to EOF).
+ boost::shared_array<char> data(new char[length]);
- return OK(object, request.query.get("jsonp"));
+ // TODO(bmahler): C++11 version when ready.
+ std::tr1::function<Future<Response>(const size_t&)> f =
+ std::tr1::bind(_read,
+ fd.get(),
+ std::tr1::placeholders::_1,
+ offset,
+ data,
+ request.query.get("jsonp"));
+ return io::read(fd.get(), data.get(), static_cast<size_t>(length)).then(f);
}
Modified: incubator/mesos/trunk/src/tests/files_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/files_tests.cpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/files_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/files_tests.cpp Tue Oct 16 23:05:15 2012
@@ -108,7 +108,6 @@ TEST_WITH_WORKDIR(FilesTest, ReadTest)
// Read a valid file.
JSON::Object expected;
expected.values["offset"] = 0;
- expected.values["length"] = strlen("body");
expected.values["data"] = "body";
response = process::http::get(pid, "read.json", "path=/myname&offset=0");
@@ -147,7 +146,6 @@ TEST_WITH_WORKDIR(FilesTest, ResolveTest
// Resolve 1/2/3 via each attached path.
JSON::Object expected;
expected.values["offset"] = 0;
- expected.values["length"] = strlen("three");
expected.values["data"] = "three";
Future<Response> response =
Modified: incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js (original)
+++ incubator/mesos/trunk/src/webui/master/static/jquery.pailer.js Tue Oct 16 23:05:15 2012
@@ -7,12 +7,12 @@
// fields 'offset' and 'length' set for reading the data. The result
// from of the function should be a "promise" like value which has a
// 'success' and 'error' callback which each take a function. An
-// object with at least two fields defined ('offset' and 'length') is
-// expected on success. A third field, 'data' may be provided if
-// 'length' is greater than 0. If the offset requested is greater than
-// the available offset, the result should be an object with the
-// 'offset' field set to the available offset (i.e., the length of the
-// data) and the 'length' field set to 0.
+// object with at least two fields defined ('offset' and 'data') is
+// expected on success. The length of 'data' may be smaller than the
+// amount requested. If the offset requested is greater than the
+// available offset, the result should be an object with the 'offset'
+// field set to the available offset (i.e., the total length of the
+// data) with an empty 'data' field.
// The plugin prepends, appends, and updates the "html" component of
// the elements specified in the jQuery selector (e.g., doing
@@ -157,24 +157,22 @@
var read = function(offset, length) {
this_.read({'offset': offset, 'length': length})
.success(function(data) {
- if (data.length < length) {
+ if (data.data.length < length) {
buffer += data.data;
- read(offset + data.length, length - data.length);
- } else if (data.length > 0) {
+ read(offset + data.data.length, length - data.data.length);
+ } else if (data.data.length > 0) {
this_.indicate('(PAGED)');
setTimeout(function() { this_.indicate(''); }, 1000);
// Prepend buffer onto data.
data.offset -= buffer.length;
data.data = buffer + data.data;
- data.length = data.data.length;
// Truncate to the first newline (unless this is the beginning).
if (data.offset != 0) {
var index = data.data.indexOf('\n') + 1;
data.offset += index;
data.data = data.data.substring(index);
- data.length -= index;
}
this_.start = data.offset;
@@ -223,18 +221,17 @@
return;
}
- if (data.length > 0) {
+ if (data.data.length > 0) {
// Truncate to the first newline if this is the first time
// (and we aren't reading from the beginning of the log).
if (this_.start == this_.end && data.offset != 0) {
var index = data.data.indexOf('\n') + 1;
data.offset += index;
data.data = data.data.substring(index);
- data.length -= index;
this_.start = data.offset; // Adjust the actual start too!
}
- this_.end = data.offset + data.length;
+ this_.end = data.offset + data.data.length;
this_.element.append(data.data);
@@ -254,7 +251,7 @@
// log data at a time ... the right solution here might be to do
// a request to determine the new ending offset and then request
// the proper length.
- if (data.length == this_.truncate_length) {
+ if (data.data.length == this_.truncate_length) {
setTimeout(function() { this_.tail(); }, 0);
} else {
setTimeout(function() { this_.tail(); }, 1000);
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/io.hpp Tue Oct 16 23:05:15 2012
@@ -1,6 +1,7 @@
#ifndef __PROCESS_IO_HPP__
#define __PROCESS_IO_HPP__
+#include <cstring> // For size_t.
#include <string>
#include <process/future.hpp>
@@ -12,11 +13,14 @@ namespace io {
const short READ = 0x01;
const short WRITE = 0x02;
+// Buffered read chunk size. Roughly 16 pages.
+const size_t BUFFERED_READ_SIZE = 16*4096;
+
+// TODO(benh): Add a version which takes multiple file descriptors.
// Returns the events (a subset of the events specified) that can be
// performed on the specified file descriptor without blocking.
Future<short> poll(int fd, short events);
-// TODO(benh): Add a version which takes multiple file descriptors.
// Performs a single non-blocking read by polling on the specified file
// descriptor until any data can be be read. The future will become ready when
@@ -28,6 +32,7 @@ Future<short> poll(int fd, short events)
// future, thus only a 'size_t' is necessary rather than a 'ssize_t').
Future<size_t> read(int fd, void* data, size_t size);
+
// Performs a series of asynchronous reads, until EOF is reached.
// NOTE: When using this, ensure the sender will close the connection
// so that EOF can be reached.
Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Tue Oct 16 23:05:15 2012
@@ -43,6 +43,8 @@
#include <tr1/functional>
#include <tr1/memory> // TODO(benh): Replace all shared_ptr with unique_ptr.
+#include <boost/shared_array.hpp>
+
#include <process/clock.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
@@ -3206,14 +3208,15 @@ Future<size_t> read(int fd, void* data,
namespace internal {
#if __cplusplus >= 201103L
-Future<string> _read(int fd, string* buffer, char* data, size_t length)
+Future<string> _read(int fd,
+ const std::tr1::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
{
- return io::read(fd, data, length)
+ return io::read(fd, data.get(), length)
.then([=] (size_t size) {
if (size == 0) { // EOF.
string result(*buffer);
- delete buffer;
- delete[] data;
return Future<string>(result);
}
buffer->append(data, size);
@@ -3222,51 +3225,55 @@ Future<string> _read(int fd, string* buf
}
#else
// Forward declataion.
-Future<string> _read(int fd, string* buffer, char* data, size_t length);
+Future<string> _read(int fd,
+ const std::tr1::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length);
Future<string> __read(
const size_t& size,
// TODO(benh): Remove 'const &' after fixing libprocess.
int fd,
- string* buffer,
- char* data,
+ const std::tr1::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
size_t length)
{
if (size == 0) { // EOF.
string result(*buffer);
- delete buffer;
- delete[] data;
return Future<string>(result);
}
- buffer->append(data, size);
+ buffer->append(data.get(), size);
return _read(fd, buffer, data, length);
}
-Future<string> _read(int fd, string* buffer, char* data, size_t length)
+Future<string> _read(int fd,
+ const std::tr1::shared_ptr<string>& buffer,
+ const boost::shared_array<char>& data,
+ size_t length)
{
std::tr1::function<Future<string>(const size_t&)> f =
std::tr1::bind(__read, lambda::_1, fd, buffer, data, length);
- return io::read(fd, data, length).then(f);
+ return io::read(fd, data.get(), length).then(f);
}
#endif
} // namespace internal
+
Future<string> read(int fd)
{
process::initialize();
// TODO(benh): Wrap up this data as a struct, use 'Owner'.
// TODO(bmahler): For efficiency, use a rope for the buffer.
- string* buffer = new string();
- size_t length = 1024;
- char* data = new char[length];
+ std::tr1::shared_ptr<string> buffer(new string());
+ boost::shared_array<char> data(new char[BUFFERED_READ_SIZE]);
- return internal::_read(fd, buffer, data, length);
+ return internal::_read(fd, buffer, data, BUFFERED_READ_SIZE);
}
Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1399035&r1=1399034&r2=1399035&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Tue Oct 16 23:05:15 2012
@@ -1351,6 +1351,42 @@ TEST(HTTP, encode)
}
+TEST(Process, BufferedRead)
+{
+ // 128 Bytes.
+ std::string data =
+ "This data is much larger than BUFFERED_READ_SIZE, which means it will "
+ "trigger multiple buffered async reads as a result.........";
+ CHECK(data.size() == 128);
+
+ // Keep doubling the data size until we're guaranteed to trigger at least
+ // 3 buffered async reads.
+ while (data.length() < 3 * io::BUFFERED_READ_SIZE) {
+ data.append(data);
+ }
+
+ ASSERT_TRUE(os::write("file", data).isSome());
+
+ Try<int> fd = os::open("file", O_RDONLY);
+ ASSERT_TRUE(fd.isSome());
+
+ // Read from blocking fd.
+ Future<std::string> future = io::read(fd.get());
+ ASSERT_TRUE(future.await(Seconds(5.0)));
+ EXPECT_TRUE(future.isFailed());
+
+ // Read from non-blocking fd.
+ ASSERT_TRUE(os::nonblock(fd.get()).isSome());
+
+ future = io::read(fd.get());
+ ASSERT_TRUE(future.await(Seconds(5.0)));
+ EXPECT_TRUE(future.isReady());
+ EXPECT_EQ(data, future.get());
+
+ os::close(fd.get());
+}
+
+
int main(int argc, char** argv)
{
// Initialize Google Mock/Test.