You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/07/07 22:01:36 UTC
mesos git commit: Split FetcherProcess into its own header file.
Repository: mesos
Updated Branches:
refs/heads/master c8bb72c66 -> 4abb33ec8
Split FetcherProcess into its own header file.
Split the FetcherProcess class into its own header file since this
is a Fetcher implementation detail that doesn't need to be visible
everywhere the Fetcher is used.
Review: https://reviews.apache.org/r/59952/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4abb33ec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4abb33ec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4abb33ec
Branch: refs/heads/master
Commit: 4abb33ec8d51bd45601ae047afe663ca6b848eaa
Parents: c8bb72c
Author: James Peach <jp...@apache.org>
Authored: Fri Jul 7 14:58:11 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Fri Jul 7 15:01:29 2017 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/slave/containerizer/fetcher.cpp | 9 +-
src/slave/containerizer/fetcher.hpp | 232 --------------------
src/slave/containerizer/fetcher_process.hpp | 268 +++++++++++++++++++++++
src/tests/mesos.hpp | 1 +
5 files changed, 277 insertions(+), 234 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 138c11f..b5cd45d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1108,6 +1108,7 @@ libmesos_no_3rdparty_la_SOURCES += \
slave/containerizer/containerizer.hpp \
slave/containerizer/docker.hpp \
slave/containerizer/fetcher.hpp \
+ slave/containerizer/fetcher_process.hpp \
slave/containerizer/mesos/constants.hpp \
slave/containerizer/mesos/containerizer.hpp \
slave/containerizer/mesos/isolator.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index e3c786b..5163ec0 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,16 +16,17 @@
#include "slave/containerizer/fetcher.hpp"
-#include <unordered_map>
-
#include <process/async.hpp>
#include <process/check.hpp>
#include <process/collect.hpp>
#include <process/dispatch.hpp>
+#include <process/id.hpp>
#include <process/owned.hpp>
+#include <process/subprocess.hpp>
#include <process/metrics/metrics.hpp>
+#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/net.hpp>
#include <stout/path.hpp>
@@ -34,12 +35,16 @@
#include <stout/windows.hpp>
#endif // __WINDOWS__
+#include <stout/os/exists.hpp>
#include <stout/os/find.hpp>
#include <stout/os/killtree.hpp>
#include <stout/os/read.hpp>
+#include <stout/os/rmdir.hpp>
#include "hdfs/hdfs.hpp"
+#include "slave/containerizer/fetcher_process.hpp"
+
using std::list;
using std::map;
using std::shared_ptr;
http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 1955da0..c1f75c7 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -21,18 +21,10 @@
#include <string>
#include <mesos/mesos.hpp>
-#include <mesos/type_utils.hpp>
#include <mesos/fetcher/fetcher.hpp>
#include <process/future.hpp>
-#include <process/id.hpp>
-#include <process/process.hpp>
-#include <process/subprocess.hpp>
-
-#include <process/metrics/counter.hpp>
-
-#include <stout/hashmap.hpp>
#include "slave/flags.hpp"
@@ -108,230 +100,6 @@ private:
};
-class FetcherProcess : public process::Process<FetcherProcess>
-{
-public:
- explicit FetcherProcess(const Flags& _flags);
- virtual ~FetcherProcess();
-
- process::Future<Nothing> fetch(
- const ContainerID& containerId,
- const CommandInfo& commandInfo,
- const std::string& sandboxDirectory,
- const Option<std::string>& user);
-
- // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
- // in the given directory, using these for trace output.
- virtual process::Future<Nothing> run(
- const ContainerID& containerId,
- const std::string& sandboxDirectory,
- const Option<std::string>& user,
- const mesos::fetcher::FetcherInfo& info);
-
- // Best effort attempt to kill the external mesos-fetcher process
- // running on behalf of the given container ID, if any.
- void kill(const ContainerID& containerId);
-
- // Representation of the fetcher cache and its contents. There is
- // exactly one instance per instance of FetcherProcess. All methods
- // of Cache are to be executed on the latter to ensure atomicity of
- // cache operations.
- class Cache
- {
- public:
- class Entry
- {
- public:
- Entry(
- const std::string& key,
- const std::string& directory,
- const std::string& filename)
- : key(key),
- directory(directory),
- filename(filename),
- size(0),
- referenceCount(0) {}
-
- ~Entry() {}
-
- // Marks this file's download as successful by setting its promise
- // to the path of the file in the cache.
- void complete();
-
- // Indicates whether this file's download into the cache is
- // successfully completed.
- process::Future<Nothing> completion();
-
- // Marks this download as failed, notifying concurrent fetch attempts
- // waiting for this result, by setting the promise to failed.
- void fail();
-
- // While an entry is "referenced" it cannot be evicted from the
- // cache.
- void reference();
- void unreference();
- bool isReferenced() const;
-
- // Returns the path in the filesystem where cache entry resides.
- // TODO(bernd-mesos): Remove this construct after refactoring so
- // that the slave flags get injected into the fetcher.
- Path path() const { return Path(path::join(directory, filename)); }
-
- // Uniquely identifies a user/URI combination.
- const std::string key;
-
- // Cache directory where this entry is stored.
- // TODO(bernd-mesos): Remove this construct after refactoring so
- // that the slave flags get injected into the fetcher.
- const std::string directory;
-
- // The unique name of the file held in the cache on behalf of a
- // URI.
- const std::string filename;
-
- // The expected size of the cache file. This field is set before
- // downloading. If the actual size of the downloaded file is
- // different a warning is logged and the field's value adjusted.
- Bytes size;
-
- private:
- // Concurrent fetch attempts can reference the same entry multiple
- // times.
- unsigned long referenceCount;
-
- // Indicates successful downloading to the cache.
- process::Promise<Nothing> promise;
- };
-
- explicit Cache(Bytes _space) : space(_space), tally(0), filenameSerial(0) {}
- virtual ~Cache() {}
-
- // Registers the maximum usable space in the cache directory.
- // TODO(bernd-mesos): This method will disappear when injecting 'flags'
- // into the fetcher instead of passing 'flags' around as parameter.
- void setSpace(const Bytes& bytes);
-
- void claimSpace(const Bytes& bytes);
- void releaseSpace(const Bytes& bytes);
- Bytes availableSpace() const;
-
- // Invents a new, distinct base name for a cache file, using the same
- // filename extension as the URI.
- std::string nextFilename(const CommandInfo::URI& uri);
-
- // Creates a new entry and inserts it into the cache table. Also
- // sets its reference count to 1. Returns the entry.
- std::shared_ptr<Entry> create(
- const std::string& cacheDirectory,
- const Option<std::string>& user,
- const CommandInfo::URI& uri);
-
- // Retrieves the cache entry indexed by the parameters, without
- // changing its reference count.
- Option<std::shared_ptr<Entry>> get(
- const Option<std::string>& user,
- const std::string& uri);
-
- // Returns whether an entry for this user and URI is in the cache.
- bool contains(
- const Option<std::string>& user, const std::string& uri) const;
-
- // Returns whether this identical entry is in the cache.
- bool contains(const std::shared_ptr<Cache::Entry>& entry) const;
-
- // Completely deletes a cache entry and its file. Warns on failure.
- // Virtual for mock testing.
- virtual Try<Nothing> remove(const std::shared_ptr<Entry>& entry);
-
- // Determines a list of cache entries to remove, respectively cache files
- // to delete, so that at least the required amount of space would become
- // available.
- Try<std::list<std::shared_ptr<Cache::Entry>>>
- selectVictims(const Bytes& requiredSpace);
-
- // Ensures that there is the requested amount of space is available
- // Evicts other files as necessary to make it so.
- Try<Nothing> reserve(const Bytes& requestedSpace);
-
- // Finds out if any predictions about cache file sizes have been
- // inaccurate, logs this if so, and records the cache files' actual
- // sizes and adjusts the cache's total amount of space in use.
- Try<Nothing> adjust(const std::shared_ptr<Cache::Entry>& entry);
-
- // Number of entries.
- size_t size() const;
-
- private:
- // Maximum storable number of bytes in the cache directory.
- const Bytes space;
-
- // How much space has been reserved to be occupied by cache files.
- Bytes tally;
-
- // Used to generate distinct cache file names simply by counting.
- unsigned long filenameSerial;
-
- // Maps keys (cache directory / URI combinations) to cache file
- // entries.
- hashmap<std::string, std::shared_ptr<Entry>> table;
-
- // Stores cache file entries sorted from LRU to MRU.
- std::list<std::shared_ptr<Entry>> lruSortedEntries;
- };
-
- // Public and virtual for mock testing.
- virtual process::Future<Nothing> _fetch(
- const hashmap<
- CommandInfo::URI,
- Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
- entries,
- const ContainerID& containerId,
- const std::string& sandboxDirectory,
- const std::string& cacheDirectory,
- const Option<std::string>& user);
-
- // Returns a list of cache files on disk for the given slave
- // (for all users combined). For testing.
- Try<std::list<Path>> cacheFiles() const;
-
- // Returns the number of cache entries for the given slave (for all
- // users combined). For testing.
- size_t cacheSize() const;
-
- // Returns the amount of remaining cache space that is not occupied
- // by cache entries. For testing.
- Bytes availableCacheSpace() const;
-
-private:
- process::Future<Nothing> __fetch(
- const hashmap<CommandInfo::URI,
- Option<std::shared_ptr<Cache::Entry>>>& entries,
- const ContainerID& containerId,
- const std::string& sandboxDirectory,
- const std::string& cacheDirectory,
- const Option<std::string>& user);
-
- // Calls Cache::reserve() and returns a ready entry future if successful,
- // else Failure. Claims the space and assigns the entry's size to this
- // amount if and only if successful.
- process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
- const Try<Bytes>& requestedSpace,
- const std::shared_ptr<Cache::Entry>& entry);
-
- const Flags flags;
-
- Cache cache;
-
- hashmap<ContainerID, pid_t> subprocessPids;
-
- // NOTE: These metrics will increment at most once per task. Even if
- // a single task asks for multiple artifacts, the total number of
- // fetches will only go up by one. And if any of those artifacts
- // fail to fetch, the failure count will only increase by one.
- process::metrics::Counter fetchesTotal;
- process::metrics::Counter fetchesFailed;
-};
-
} // namespace slave {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/slave/containerizer/fetcher_process.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher_process.hpp b/src/slave/containerizer/fetcher_process.hpp
new file mode 100644
index 0000000..0a34230
--- /dev/null
+++ b/src/slave/containerizer/fetcher_process.hpp
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__
+#define __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__
+
+#include <list>
+#include <string>
+
+#include <mesos/mesos.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <process/metrics/counter.hpp>
+
+#include <stout/hashmap.hpp>
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class FetcherProcess : public process::Process<FetcherProcess>
+{
+public:
+ explicit FetcherProcess(const Flags& _flags);
+ virtual ~FetcherProcess();
+
+ process::Future<Nothing> fetch(
+ const ContainerID& containerId,
+ const CommandInfo& commandInfo,
+ const std::string& sandboxDirectory,
+ const Option<std::string>& user);
+
+ // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
+ // in the given directory, using these for trace output.
+ virtual process::Future<Nothing> run(
+ const ContainerID& containerId,
+ const std::string& sandboxDirectory,
+ const Option<std::string>& user,
+ const mesos::fetcher::FetcherInfo& info);
+
+ // Best effort attempt to kill the external mesos-fetcher process
+ // running on behalf of the given container ID, if any.
+ void kill(const ContainerID& containerId);
+
+ // Representation of the fetcher cache and its contents. There is
+ // exactly one instance per instance of FetcherProcess. All methods
+ // of Cache are to be executed on the latter to ensure atomicity of
+ // cache operations.
+ class Cache
+ {
+ public:
+ class Entry
+ {
+ public:
+ Entry(
+ const std::string& key,
+ const std::string& directory,
+ const std::string& filename)
+ : key(key),
+ directory(directory),
+ filename(filename),
+ size(0),
+ referenceCount(0) {}
+
+ ~Entry() {}
+
+ // Marks this file's download as successful by setting its promise
+ // to the path of the file in the cache.
+ void complete();
+
+ // Indicates whether this file's download into the cache is
+ // successfully completed.
+ process::Future<Nothing> completion();
+
+ // Marks this download as failed, notifying concurrent fetch attempts
+ // waiting for this result, by setting the promise to failed.
+ void fail();
+
+ // While an entry is "referenced" it cannot be evicted from the
+ // cache.
+ void reference();
+ void unreference();
+ bool isReferenced() const;
+
+ // Returns the path in the filesystem where cache entry resides.
+ // TODO(bernd-mesos): Remove this construct after refactoring so
+ // that the slave flags get injected into the fetcher.
+ Path path() const { return Path(path::join(directory, filename)); }
+
+ // Uniquely identifies a user/URI combination.
+ const std::string key;
+
+ // Cache directory where this entry is stored.
+ // TODO(bernd-mesos): Remove this construct after refactoring so
+ // that the slave flags get injected into the fetcher.
+ const std::string directory;
+
+ // The unique name of the file held in the cache on behalf of a
+ // URI.
+ const std::string filename;
+
+ // The expected size of the cache file. This field is set before
+ // downloading. If the actual size of the downloaded file is
+ // different a warning is logged and the field's value adjusted.
+ Bytes size;
+
+ private:
+ // Concurrent fetch attempts can reference the same entry multiple
+ // times.
+ unsigned long referenceCount;
+
+ // Indicates successful downloading to the cache.
+ process::Promise<Nothing> promise;
+ };
+
+ explicit Cache(Bytes _space) : space(_space), tally(0), filenameSerial(0) {}
+ virtual ~Cache() {}
+
+ // Registers the maximum usable space in the cache directory.
+ // TODO(bernd-mesos): This method will disappear when injecting 'flags'
+ // into the fetcher instead of passing 'flags' around as parameter.
+ void setSpace(const Bytes& bytes);
+
+ void claimSpace(const Bytes& bytes);
+ void releaseSpace(const Bytes& bytes);
+ Bytes availableSpace() const;
+
+ // Invents a new, distinct base name for a cache file, using the same
+ // filename extension as the URI.
+ std::string nextFilename(const CommandInfo::URI& uri);
+
+ // Creates a new entry and inserts it into the cache table. Also
+ // sets its reference count to 1. Returns the entry.
+ std::shared_ptr<Entry> create(
+ const std::string& cacheDirectory,
+ const Option<std::string>& user,
+ const CommandInfo::URI& uri);
+
+ // Retrieves the cache entry indexed by the parameters, without
+ // changing its reference count.
+ Option<std::shared_ptr<Entry>> get(
+ const Option<std::string>& user,
+ const std::string& uri);
+
+ // Returns whether an entry for this user and URI is in the cache.
+ bool contains(
+ const Option<std::string>& user, const std::string& uri) const;
+
+ // Returns whether this identical entry is in the cache.
+ bool contains(const std::shared_ptr<Cache::Entry>& entry) const;
+
+ // Completely deletes a cache entry and its file. Warns on failure.
+ // Virtual for mock testing.
+ virtual Try<Nothing> remove(const std::shared_ptr<Entry>& entry);
+
+ // Determines a list of cache entries to remove, respectively cache files
+ // to delete, so that at least the required amount of space would become
+ // available.
+ Try<std::list<std::shared_ptr<Cache::Entry>>>
+ selectVictims(const Bytes& requiredSpace);
+
+ // Ensures that there is the requested amount of space is available
+ // Evicts other files as necessary to make it so.
+ Try<Nothing> reserve(const Bytes& requestedSpace);
+
+ // Finds out if any predictions about cache file sizes have been
+ // inaccurate, logs this if so, and records the cache files' actual
+ // sizes and adjusts the cache's total amount of space in use.
+ Try<Nothing> adjust(const std::shared_ptr<Cache::Entry>& entry);
+
+ // Number of entries.
+ size_t size() const;
+
+ private:
+ // Maximum storable number of bytes in the cache directory.
+ const Bytes space;
+
+ // How much space has been reserved to be occupied by cache files.
+ Bytes tally;
+
+ // Used to generate distinct cache file names simply by counting.
+ unsigned long filenameSerial;
+
+ // Maps keys (cache directory / URI combinations) to cache file
+ // entries.
+ hashmap<std::string, std::shared_ptr<Entry>> table;
+
+ // Stores cache file entries sorted from LRU to MRU.
+ std::list<std::shared_ptr<Entry>> lruSortedEntries;
+ };
+
+ // Public and virtual for mock testing.
+ virtual process::Future<Nothing> _fetch(
+ const hashmap<
+ CommandInfo::URI,
+ Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+ entries,
+ const ContainerID& containerId,
+ const std::string& sandboxDirectory,
+ const std::string& cacheDirectory,
+ const Option<std::string>& user);
+
+ // Returns a list of cache files on disk for the given slave
+ // (for all users combined). For testing.
+ Try<std::list<Path>> cacheFiles() const;
+
+ // Returns the number of cache entries for the given slave (for all
+ // users combined). For testing.
+ size_t cacheSize() const;
+
+ // Returns the amount of remaining cache space that is not occupied
+ // by cache entries. For testing.
+ Bytes availableCacheSpace() const;
+
+private:
+ process::Future<Nothing> __fetch(
+ const hashmap<CommandInfo::URI,
+ Option<std::shared_ptr<Cache::Entry>>>& entries,
+ const ContainerID& containerId,
+ const std::string& sandboxDirectory,
+ const std::string& cacheDirectory,
+ const Option<std::string>& user);
+
+ // Calls Cache::reserve() and returns a ready entry future if successful,
+ // else Failure. Claims the space and assigns the entry's size to this
+ // amount if and only if successful.
+ process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
+ const Try<Bytes>& requestedSpace,
+ const std::shared_ptr<Cache::Entry>& entry);
+
+ const Flags flags;
+
+ Cache cache;
+
+ hashmap<ContainerID, pid_t> subprocessPids;
+
+ // NOTE: These metrics will increment at most once per task. Even if
+ // a single task asks for multiple artifacts, the total number of
+ // fetches will only go up by one. And if any of those artifacts
+ // fail to fetch, the failure count will only increase by one.
+ process::metrics::Counter fetchesTotal;
+ process::metrics::Counter fetchesFailed;
+};
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index eac2c96..06b22f9 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -75,6 +75,7 @@
#include "slave/containerizer/containerizer.hpp"
#include "slave/containerizer/fetcher.hpp"
+#include "slave/containerizer/fetcher_process.hpp"
#include "slave/containerizer/mesos/containerizer.hpp"