You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/10/08 18:07:42 UTC
[mesos] 02/04: Added `--fetcher_stall_timeout` to abort stalled
artifact fetching.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 0d05ffc174f90aa8573869ab36bd338224121b42
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Wed Mar 28 22:47:52 2018 -0700
Added `--fetcher_stall_timeout` to abort stalled artifact fetching.
This flag specifies a timeout for `mesos-fetcher` to wait before
aborting if the download speed keeps below 1 bytes/sec. This would avoid
containers to get stuck at FETCHING.
Review: https://reviews.apache.org/r/65856/
---
docs/configuration/agent.md | 12 +++++++++++
include/mesos/fetcher/fetcher.proto | 3 +++
src/launcher/fetcher.cpp | 40 +++++++++++++++++++++++++------------
src/slave/constants.hpp | 3 +++
src/slave/containerizer/fetcher.cpp | 3 +++
src/slave/flags.cpp | 9 +++++++++
src/slave/flags.hpp | 1 +
7 files changed, 58 insertions(+), 13 deletions(-)
diff --git a/docs/configuration/agent.md b/docs/configuration/agent.md
index 3cccf89..2a72a64 100644
--- a/docs/configuration/agent.md
+++ b/docs/configuration/agent.md
@@ -804,6 +804,18 @@ Size of the fetcher cache in Bytes. (default: 2GB)
</tr>
<tr>
<td>
+ --fetcher_stall_timeout=VALUE
+ </td>
+ <td>
+Amount of time for the fetcher to wait before considering a download
+being too slow and abort it when the download stalls (i.e., the speed
+keeps below one byte per second).
+<b>NOTE</b>: This feature only applies when downloading data from the net and
+does not apply to HDFS. (default: 1mins)
+ </td>
+</tr>
+<tr>
+ <td>
--frameworks_home=VALUE
</td>
<td>
diff --git a/include/mesos/fetcher/fetcher.proto b/include/mesos/fetcher/fetcher.proto
index 6a5d807..d668106 100644
--- a/include/mesos/fetcher/fetcher.proto
+++ b/include/mesos/fetcher/fetcher.proto
@@ -64,4 +64,7 @@ message FetcherInfo {
repeated Item items = 3;
optional string user = 4;
optional string frameworks_home = 5;
+
+ // Only applies when fetching artifacts from the net.
+ optional DurationInfo stall_timeout = 6;
}
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index e2372a1..06fa52c 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -165,7 +165,8 @@ static Try<string> downloadWithHadoopClient(
static Try<string> downloadWithNet(
const string& sourceUri,
- const string& destinationPath)
+ const string& destinationPath,
+ const Option<Duration>& stallTimeout)
{
// The net::download function only supports these protocols.
CHECK(strings::startsWith(sourceUri, "http://") ||
@@ -176,7 +177,7 @@ static Try<string> downloadWithNet(
LOG(INFO) << "Downloading resource from '" << sourceUri
<< "' to '" << destinationPath << "'";
- Try<int> code = net::download(sourceUri, destinationPath);
+ Try<int> code = net::download(sourceUri, destinationPath, stallTimeout);
if (code.isError()) {
return Error("Error downloading resource: " + code.error());
} else {
@@ -217,7 +218,8 @@ static Try<string> copyFile(
static Try<string> download(
const string& _sourceUri,
const string& destinationPath,
- const Option<string>& frameworksHome)
+ const Option<string>& frameworksHome,
+ const Option<Duration>& stallTimeout)
{
// Trim leading whitespace for 'sourceUri'.
const string sourceUri = strings::trim(_sourceUri, strings::PREFIX);
@@ -243,7 +245,7 @@ static Try<string> download(
// 2. Try to fetch URI using os::net / libcurl implementation.
// We consider http, https, ftp, ftps compatible with libcurl.
if (Fetcher::isNetUri(sourceUri)) {
- return downloadWithNet(sourceUri, destinationPath);
+ return downloadWithNet(sourceUri, destinationPath, stallTimeout);
}
// 3. Try to fetch the URI using hadoop client.
@@ -286,7 +288,8 @@ static Try<string> chmodExecutable(const string& filePath)
static Try<string> fetchBypassingCache(
const CommandInfo::URI& uri,
const string& sandboxDirectory,
- const Option<string>& frameworksHome)
+ const Option<string>& frameworksHome,
+ const Option<Duration>& stallTimeout)
{
LOG(INFO) << "Fetching directly into the sandbox directory";
@@ -315,7 +318,8 @@ static Try<string> fetchBypassingCache(
string path = path::join(sandboxDirectory, outputFile.get());
- Try<string> downloaded = download(uri.value(), path, frameworksHome);
+ Try<string> downloaded =
+ download(uri.value(), path, frameworksHome, stallTimeout);
if (downloaded.isError()) {
return Error(downloaded.error());
}
@@ -404,7 +408,8 @@ static Try<string> fetchThroughCache(
const FetcherInfo::Item& item,
const Option<string>& cacheDirectory,
const string& sandboxDirectory,
- const Option<string>& frameworksHome)
+ const Option<string>& frameworksHome,
+ const Option<Duration>& stallTimeout)
{
if (cacheDirectory.isNone() || cacheDirectory.get().empty()) {
return Error("Cache directory not specified");
@@ -428,7 +433,8 @@ static Try<string> fetchThroughCache(
Try<string> downloaded = download(
item.uri().value(),
path::join(cacheDirectory.get(), item.cache_filename()),
- frameworksHome);
+ frameworksHome,
+ stallTimeout);
if (downloaded.isError()) {
return Error(downloaded.error());
@@ -445,7 +451,8 @@ static Try<string> fetch(
const FetcherInfo::Item& item,
const Option<string>& cacheDirectory,
const string& sandboxDirectory,
- const Option<string>& frameworksHome)
+ const Option<string>& frameworksHome,
+ const Option<Duration>& stallTimeout)
{
LOG(INFO) << "Fetching URI '" << item.uri().value() << "'";
@@ -453,14 +460,16 @@ static Try<string> fetch(
return fetchBypassingCache(
item.uri(),
sandboxDirectory,
- frameworksHome);
+ frameworksHome,
+ stallTimeout);
}
return fetchThroughCache(
item,
cacheDirectory,
sandboxDirectory,
- frameworksHome);
+ frameworksHome,
+ stallTimeout);
}
@@ -592,10 +601,15 @@ int main(int argc, char* argv[])
Option<string>::some(fetcherInfo.get().frameworks_home()) :
Option<string>::none();
+ const Option<Duration> stallTimeout =
+ fetcherInfo.get().has_stall_timeout()
+ ? Nanoseconds(fetcherInfo.get().stall_timeout().nanoseconds())
+ : Option<Duration>::none();
+
// Fetch each URI to a local file and chmod if necessary.
foreach (const FetcherInfo::Item& item, fetcherInfo.get().items()) {
- Try<string> fetched =
- fetch(item, cacheDirectory, sandboxDirectory, frameworksHome);
+ Try<string> fetched = fetch(
+ item, cacheDirectory, sandboxDirectory, frameworksHome, stallTimeout);
if (fetched.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to fetch '" << item.uri().value() << "': " + fetched.error();
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index e2ebde5..068d744 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -175,6 +175,9 @@ constexpr char EXECUTOR_HTTP_AUTHENTICATION_REALM[] = "mesos-agent-executor";
// Default maximum storage space to be used by the fetcher cache.
constexpr Bytes DEFAULT_FETCHER_CACHE_SIZE = Gigabytes(2);
+// Default timeout for the fetcher to wait when a net download stalls.
+constexpr Duration DEFAULT_FETCHER_STALL_TIMEOUT = Minutes(1);
+
// If no pings received within this timeout, then the slave will
// trigger a re-detection of the master to cause a re-registration.
Duration DEFAULT_MASTER_PING_TIMEOUT();
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index 8b26e88..e08336d 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -561,6 +561,9 @@ Future<Nothing> FetcherProcess::__fetch(
info.set_frameworks_home(flags.frameworks_home);
}
+ info.mutable_stall_timeout()
+ ->set_nanoseconds(flags.fetcher_stall_timeout.ns());
+
return run(containerId, sandboxDirectory, user, info)
.repair(defer(self(), [=](const Future<Nothing>& future) {
++metrics.task_fetches_failed;
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 7129cea..ec8ff29 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -248,6 +248,15 @@ mesos::internal::slave::Flags::Flags()
" each other when occupying a shared space (i.e. disk contention).",
path::join(os::temp(), "mesos", "fetch"));
+ add(&Flags::fetcher_stall_timeout,
+ "fetcher_stall_timeout",
+ "Amount of time for the fetcher to wait before considering a download\n"
+ "being too slow and abort it when the download stalls (i.e., the speed\n"
+ "keeps below one byte per second).\n"
+ "NOTE: This feature only applies when downloading data from the net and\n"
+ "does not apply to HDFS.",
+ DEFAULT_FETCHER_STALL_TIMEOUT);
+
add(&Flags::work_dir,
"work_dir",
"Path of the agent work directory. This is where executor sandboxes\n"
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 5e62e54..91f7297 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -67,6 +67,7 @@ public:
Option<std::string> attributes;
Bytes fetcher_cache_size;
std::string fetcher_cache_dir;
+ Duration fetcher_stall_timeout;
std::string work_dir;
std::string runtime_dir;
std::string launcher_dir;