You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/10/08 18:07:42 UTC

[mesos] 02/04: Added `--fetcher_stall_timeout` to abort stalled artifact fetching.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 0d05ffc174f90aa8573869ab36bd338224121b42
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Wed Mar 28 22:47:52 2018 -0700

    Added `--fetcher_stall_timeout` to abort stalled artifact fetching.
    
    This flag specifies a timeout for `mesos-fetcher` to wait before
    aborting if the download speed keeps below 1 bytes/sec. This would avoid
    containers to get stuck at FETCHING.
    
    Review: https://reviews.apache.org/r/65856/
---
 docs/configuration/agent.md         | 12 +++++++++++
 include/mesos/fetcher/fetcher.proto |  3 +++
 src/launcher/fetcher.cpp            | 40 +++++++++++++++++++++++++------------
 src/slave/constants.hpp             |  3 +++
 src/slave/containerizer/fetcher.cpp |  3 +++
 src/slave/flags.cpp                 |  9 +++++++++
 src/slave/flags.hpp                 |  1 +
 7 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/docs/configuration/agent.md b/docs/configuration/agent.md
index 3cccf89..2a72a64 100644
--- a/docs/configuration/agent.md
+++ b/docs/configuration/agent.md
@@ -804,6 +804,18 @@ Size of the fetcher cache in Bytes. (default: 2GB)
 </tr>
 <tr>
   <td>
+    --fetcher_stall_timeout=VALUE
+  </td>
+  <td>
+Amount of time for the fetcher to wait before considering a download
+being too slow and abort it when the download stalls (i.e., the speed
+keeps below one byte per second).
+<b>NOTE</b>: This feature only applies when downloading data from the net and
+does not apply to HDFS. (default: 1mins)
+  </td>
+</tr>
+<tr>
+  <td>
     --frameworks_home=VALUE
   </td>
   <td>
diff --git a/include/mesos/fetcher/fetcher.proto b/include/mesos/fetcher/fetcher.proto
index 6a5d807..d668106 100644
--- a/include/mesos/fetcher/fetcher.proto
+++ b/include/mesos/fetcher/fetcher.proto
@@ -64,4 +64,7 @@ message FetcherInfo {
   repeated Item items = 3;
   optional string user = 4;
   optional string frameworks_home = 5;
+
+  // Only applies when fetching artifacts from the net.
+  optional DurationInfo stall_timeout = 6;
 }
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index e2372a1..06fa52c 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -165,7 +165,8 @@ static Try<string> downloadWithHadoopClient(
 
 static Try<string> downloadWithNet(
     const string& sourceUri,
-    const string& destinationPath)
+    const string& destinationPath,
+    const Option<Duration>& stallTimeout)
 {
   // The net::download function only supports these protocols.
   CHECK(strings::startsWith(sourceUri, "http://")  ||
@@ -176,7 +177,7 @@ static Try<string> downloadWithNet(
   LOG(INFO) << "Downloading resource from '" << sourceUri
             << "' to '" << destinationPath << "'";
 
-  Try<int> code = net::download(sourceUri, destinationPath);
+  Try<int> code = net::download(sourceUri, destinationPath, stallTimeout);
   if (code.isError()) {
     return Error("Error downloading resource: " + code.error());
   } else {
@@ -217,7 +218,8 @@ static Try<string> copyFile(
 static Try<string> download(
     const string& _sourceUri,
     const string& destinationPath,
-    const Option<string>& frameworksHome)
+    const Option<string>& frameworksHome,
+    const Option<Duration>& stallTimeout)
 {
   // Trim leading whitespace for 'sourceUri'.
   const string sourceUri = strings::trim(_sourceUri, strings::PREFIX);
@@ -243,7 +245,7 @@ static Try<string> download(
   // 2. Try to fetch URI using os::net / libcurl implementation.
   // We consider http, https, ftp, ftps compatible with libcurl.
   if (Fetcher::isNetUri(sourceUri)) {
-    return downloadWithNet(sourceUri, destinationPath);
+    return downloadWithNet(sourceUri, destinationPath, stallTimeout);
   }
 
   // 3. Try to fetch the URI using hadoop client.
@@ -286,7 +288,8 @@ static Try<string> chmodExecutable(const string& filePath)
 static Try<string> fetchBypassingCache(
     const CommandInfo::URI& uri,
     const string& sandboxDirectory,
-    const Option<string>& frameworksHome)
+    const Option<string>& frameworksHome,
+    const Option<Duration>& stallTimeout)
 {
   LOG(INFO) << "Fetching directly into the sandbox directory";
 
@@ -315,7 +318,8 @@ static Try<string> fetchBypassingCache(
 
   string path = path::join(sandboxDirectory, outputFile.get());
 
-  Try<string> downloaded = download(uri.value(), path, frameworksHome);
+  Try<string> downloaded =
+    download(uri.value(), path, frameworksHome, stallTimeout);
   if (downloaded.isError()) {
     return Error(downloaded.error());
   }
@@ -404,7 +408,8 @@ static Try<string> fetchThroughCache(
     const FetcherInfo::Item& item,
     const Option<string>& cacheDirectory,
     const string& sandboxDirectory,
-    const Option<string>& frameworksHome)
+    const Option<string>& frameworksHome,
+    const Option<Duration>& stallTimeout)
 {
   if (cacheDirectory.isNone() || cacheDirectory.get().empty()) {
     return Error("Cache directory not specified");
@@ -428,7 +433,8 @@ static Try<string> fetchThroughCache(
     Try<string> downloaded = download(
         item.uri().value(),
         path::join(cacheDirectory.get(), item.cache_filename()),
-        frameworksHome);
+        frameworksHome,
+        stallTimeout);
 
     if (downloaded.isError()) {
       return Error(downloaded.error());
@@ -445,7 +451,8 @@ static Try<string> fetch(
     const FetcherInfo::Item& item,
     const Option<string>& cacheDirectory,
     const string& sandboxDirectory,
-    const Option<string>& frameworksHome)
+    const Option<string>& frameworksHome,
+    const Option<Duration>& stallTimeout)
 {
   LOG(INFO) << "Fetching URI '" << item.uri().value() << "'";
 
@@ -453,14 +460,16 @@ static Try<string> fetch(
     return fetchBypassingCache(
         item.uri(),
         sandboxDirectory,
-        frameworksHome);
+        frameworksHome,
+        stallTimeout);
   }
 
   return fetchThroughCache(
       item,
       cacheDirectory,
       sandboxDirectory,
-      frameworksHome);
+      frameworksHome,
+      stallTimeout);
 }
 
 
@@ -592,10 +601,15 @@ int main(int argc, char* argv[])
       Option<string>::some(fetcherInfo.get().frameworks_home()) :
         Option<string>::none();
 
+  const Option<Duration> stallTimeout =
+    fetcherInfo.get().has_stall_timeout()
+      ? Nanoseconds(fetcherInfo.get().stall_timeout().nanoseconds())
+      : Option<Duration>::none();
+
   // Fetch each URI to a local file and chmod if necessary.
   foreach (const FetcherInfo::Item& item, fetcherInfo.get().items()) {
-    Try<string> fetched =
-      fetch(item, cacheDirectory, sandboxDirectory, frameworksHome);
+    Try<string> fetched = fetch(
+        item, cacheDirectory, sandboxDirectory, frameworksHome, stallTimeout);
     if (fetched.isError()) {
       EXIT(EXIT_FAILURE)
         << "Failed to fetch '" << item.uri().value() << "': " + fetched.error();
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index e2ebde5..068d744 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -175,6 +175,9 @@ constexpr char EXECUTOR_HTTP_AUTHENTICATION_REALM[] = "mesos-agent-executor";
 // Default maximum storage space to be used by the fetcher cache.
 constexpr Bytes DEFAULT_FETCHER_CACHE_SIZE = Gigabytes(2);
 
+// Default timeout for the fetcher to wait when a net download stalls.
+constexpr Duration DEFAULT_FETCHER_STALL_TIMEOUT = Minutes(1);
+
 // If no pings received within this timeout, then the slave will
 // trigger a re-detection of the master to cause a re-registration.
 Duration DEFAULT_MASTER_PING_TIMEOUT();
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index 8b26e88..e08336d 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -561,6 +561,9 @@ Future<Nothing> FetcherProcess::__fetch(
     info.set_frameworks_home(flags.frameworks_home);
   }
 
+  info.mutable_stall_timeout()
+    ->set_nanoseconds(flags.fetcher_stall_timeout.ns());
+
   return run(containerId, sandboxDirectory, user, info)
     .repair(defer(self(), [=](const Future<Nothing>& future) {
       ++metrics.task_fetches_failed;
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 7129cea..ec8ff29 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -248,6 +248,15 @@ mesos::internal::slave::Flags::Flags()
       "    each other when occupying a shared space (i.e. disk contention).",
       path::join(os::temp(), "mesos", "fetch"));
 
+  add(&Flags::fetcher_stall_timeout,
+      "fetcher_stall_timeout",
+      "Amount of time for the fetcher to wait before considering a download\n"
+      "being too slow and abort it when the download stalls (i.e., the speed\n"
+      "keeps below one byte per second).\n"
+      "NOTE: This feature only applies when downloading data from the net and\n"
+      "does not apply to HDFS.",
+      DEFAULT_FETCHER_STALL_TIMEOUT);
+
   add(&Flags::work_dir,
       "work_dir",
       "Path of the agent work directory. This is where executor sandboxes\n"
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 5e62e54..91f7297 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -67,6 +67,7 @@ public:
   Option<std::string> attributes;
   Bytes fetcher_cache_size;
   std::string fetcher_cache_dir;
+  Duration fetcher_stall_timeout;
   std::string work_dir;
   std::string runtime_dir;
   std::string launcher_dir;