You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ya...@apache.org on 2017/07/07 22:01:36 UTC

mesos git commit: Split FetcherProcess into its own header file.

Repository: mesos
Updated Branches:
  refs/heads/master c8bb72c66 -> 4abb33ec8


Split FetcherProcess into its own header file.

Split the FetcherProcess class into its own header file since this
is a Fetcher implementation detail that doesn't need to be visible
everywhere the Fetcher is used.

Review: https://reviews.apache.org/r/59952/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4abb33ec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4abb33ec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4abb33ec

Branch: refs/heads/master
Commit: 4abb33ec8d51bd45601ae047afe663ca6b848eaa
Parents: c8bb72c
Author: James Peach <jp...@apache.org>
Authored: Fri Jul 7 14:58:11 2017 -0700
Committer: Jiang Yan Xu <xu...@apple.com>
Committed: Fri Jul 7 15:01:29 2017 -0700

----------------------------------------------------------------------
 src/Makefile.am                             |   1 +
 src/slave/containerizer/fetcher.cpp         |   9 +-
 src/slave/containerizer/fetcher.hpp         | 232 --------------------
 src/slave/containerizer/fetcher_process.hpp | 268 +++++++++++++++++++++++
 src/tests/mesos.hpp                         |   1 +
 5 files changed, 277 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 138c11f..b5cd45d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1108,6 +1108,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   slave/containerizer/containerizer.hpp					\
   slave/containerizer/docker.hpp					\
   slave/containerizer/fetcher.hpp					\
+  slave/containerizer/fetcher_process.hpp				\
   slave/containerizer/mesos/constants.hpp				\
   slave/containerizer/mesos/containerizer.hpp				\
   slave/containerizer/mesos/isolator.hpp				\

http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index e3c786b..5163ec0 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -16,16 +16,17 @@
 
 #include "slave/containerizer/fetcher.hpp"
 
-#include <unordered_map>
-
 #include <process/async.hpp>
 #include <process/check.hpp>
 #include <process/collect.hpp>
 #include <process/dispatch.hpp>
+#include <process/id.hpp>
 #include <process/owned.hpp>
+#include <process/subprocess.hpp>
 
 #include <process/metrics/metrics.hpp>
 
+#include <stout/hashmap.hpp>
 #include <stout/hashset.hpp>
 #include <stout/net.hpp>
 #include <stout/path.hpp>
@@ -34,12 +35,16 @@
 #include <stout/windows.hpp>
 #endif // __WINDOWS__
 
+#include <stout/os/exists.hpp>
 #include <stout/os/find.hpp>
 #include <stout/os/killtree.hpp>
 #include <stout/os/read.hpp>
+#include <stout/os/rmdir.hpp>
 
 #include "hdfs/hdfs.hpp"
 
+#include "slave/containerizer/fetcher_process.hpp"
+
 using std::list;
 using std::map;
 using std::shared_ptr;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/slave/containerizer/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.hpp b/src/slave/containerizer/fetcher.hpp
index 1955da0..c1f75c7 100644
--- a/src/slave/containerizer/fetcher.hpp
+++ b/src/slave/containerizer/fetcher.hpp
@@ -21,18 +21,10 @@
 #include <string>
 
 #include <mesos/mesos.hpp>
-#include <mesos/type_utils.hpp>
 
 #include <mesos/fetcher/fetcher.hpp>
 
 #include <process/future.hpp>
-#include <process/id.hpp>
-#include <process/process.hpp>
-#include <process/subprocess.hpp>
-
-#include <process/metrics/counter.hpp>
-
-#include <stout/hashmap.hpp>
 
 #include "slave/flags.hpp"
 
@@ -108,230 +100,6 @@ private:
 };
 
 
-class FetcherProcess : public process::Process<FetcherProcess>
-{
-public:
-  explicit FetcherProcess(const Flags& _flags);
-  virtual ~FetcherProcess();
-
-  process::Future<Nothing> fetch(
-      const ContainerID& containerId,
-      const CommandInfo& commandInfo,
-      const std::string& sandboxDirectory,
-      const Option<std::string>& user);
-
-  // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
-  // in the given directory, using these for trace output.
-  virtual process::Future<Nothing> run(
-      const ContainerID& containerId,
-      const std::string& sandboxDirectory,
-      const Option<std::string>& user,
-      const mesos::fetcher::FetcherInfo& info);
-
-  // Best effort attempt to kill the external mesos-fetcher process
-  // running on behalf of the given container ID, if any.
-  void kill(const ContainerID& containerId);
-
-  // Representation of the fetcher cache and its contents. There is
-  // exactly one instance per instance of FetcherProcess. All methods
-  // of Cache are to be executed on the latter to ensure atomicity of
-  // cache operations.
-  class Cache
-  {
-  public:
-    class Entry
-    {
-    public:
-      Entry(
-          const std::string& key,
-          const std::string& directory,
-          const std::string& filename)
-        : key(key),
-          directory(directory),
-          filename(filename),
-          size(0),
-          referenceCount(0) {}
-
-      ~Entry() {}
-
-      // Marks this file's download as successful by setting its promise
-      // to the path of the file in the cache.
-      void complete();
-
-      // Indicates whether this file's download into the cache is
-      // successfully completed.
-      process::Future<Nothing> completion();
-
-      // Marks this download as failed, notifying concurrent fetch attempts
-      // waiting for this result, by setting the promise to failed.
-      void fail();
-
-      // While an entry is "referenced" it cannot be evicted from the
-      // cache.
-      void reference();
-      void unreference();
-      bool isReferenced() const;
-
-      // Returns the path in the filesystem where cache entry resides.
-      // TODO(bernd-mesos): Remove this construct after refactoring so
-      // that the slave flags get injected into the fetcher.
-      Path path() const { return Path(path::join(directory, filename)); }
-
-      // Uniquely identifies a user/URI combination.
-      const std::string key;
-
-      // Cache directory where this entry is stored.
-      // TODO(bernd-mesos): Remove this construct after refactoring so
-      // that the slave flags get injected into the fetcher.
-      const std::string directory;
-
-      // The unique name of the file held in the cache on behalf of a
-      // URI.
-      const std::string filename;
-
-      // The expected size of the cache file. This field is set before
-      // downloading. If the actual size of the downloaded file is
-      // different a warning is logged and the field's value adjusted.
-      Bytes size;
-
-    private:
-      // Concurrent fetch attempts can reference the same entry multiple
-      // times.
-      unsigned long referenceCount;
-
-     // Indicates successful downloading to the cache.
-      process::Promise<Nothing> promise;
-    };
-
-    explicit Cache(Bytes _space) : space(_space), tally(0), filenameSerial(0) {}
-    virtual ~Cache() {}
-
-    // Registers the maximum usable space in the cache directory.
-    // TODO(bernd-mesos): This method will disappear when injecting 'flags'
-    // into the fetcher instead of passing 'flags' around as parameter.
-    void setSpace(const Bytes& bytes);
-
-    void claimSpace(const Bytes& bytes);
-    void releaseSpace(const Bytes& bytes);
-    Bytes availableSpace() const;
-
-    // Invents a new, distinct base name for a cache file, using the same
-    // filename extension as the URI.
-    std::string nextFilename(const CommandInfo::URI& uri);
-
-    // Creates a new entry and inserts it into the cache table. Also
-    // sets its reference count to 1. Returns the entry.
-    std::shared_ptr<Entry> create(
-        const std::string& cacheDirectory,
-        const Option<std::string>& user,
-        const CommandInfo::URI& uri);
-
-    // Retrieves the cache entry indexed by the parameters, without
-    // changing its reference count.
-    Option<std::shared_ptr<Entry>> get(
-        const Option<std::string>& user,
-        const std::string& uri);
-
-    // Returns whether an entry for this user and URI is in the cache.
-    bool contains(
-        const Option<std::string>& user, const std::string& uri) const;
-
-    // Returns whether this identical entry is in the cache.
-    bool contains(const std::shared_ptr<Cache::Entry>& entry) const;
-
-    // Completely deletes a cache entry and its file. Warns on failure.
-    // Virtual for mock testing.
-    virtual Try<Nothing> remove(const std::shared_ptr<Entry>& entry);
-
-    // Determines a list of cache entries to remove, respectively cache files
-    // to delete, so that at least the required amount of space would become
-    // available.
-    Try<std::list<std::shared_ptr<Cache::Entry>>>
-        selectVictims(const Bytes& requiredSpace);
-
-    // Ensures that there is the requested amount of space is available
-    // Evicts other files as necessary to make it so.
-    Try<Nothing> reserve(const Bytes& requestedSpace);
-
-    // Finds out if any predictions about cache file sizes have been
-    // inaccurate, logs this if so, and records the cache files' actual
-    // sizes and adjusts the cache's total amount of space in use.
-    Try<Nothing> adjust(const std::shared_ptr<Cache::Entry>& entry);
-
-    // Number of entries.
-    size_t size() const;
-
-  private:
-    // Maximum storable number of bytes in the cache directory.
-    const Bytes space;
-
-    // How much space has been reserved to be occupied by cache files.
-    Bytes tally;
-
-    // Used to generate distinct cache file names simply by counting.
-    unsigned long filenameSerial;
-
-    // Maps keys (cache directory / URI combinations) to cache file
-    // entries.
-    hashmap<std::string, std::shared_ptr<Entry>> table;
-
-    // Stores cache file entries sorted from LRU to MRU.
-    std::list<std::shared_ptr<Entry>> lruSortedEntries;
-  };
-
-  // Public and virtual for mock testing.
-  virtual process::Future<Nothing> _fetch(
-      const hashmap<
-          CommandInfo::URI,
-          Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
-        entries,
-      const ContainerID& containerId,
-      const std::string& sandboxDirectory,
-      const std::string& cacheDirectory,
-      const Option<std::string>& user);
-
-  // Returns a list of cache files on disk for the given slave
-  // (for all users combined). For testing.
-  Try<std::list<Path>> cacheFiles() const;
-
-  // Returns the number of cache entries for the given slave (for all
-  // users combined). For testing.
-  size_t cacheSize() const;
-
-  // Returns the amount of remaining cache space that is not occupied
-  // by cache entries. For testing.
-  Bytes availableCacheSpace() const;
-
-private:
-  process::Future<Nothing> __fetch(
-      const hashmap<CommandInfo::URI,
-      Option<std::shared_ptr<Cache::Entry>>>& entries,
-      const ContainerID& containerId,
-      const std::string& sandboxDirectory,
-      const std::string& cacheDirectory,
-      const Option<std::string>& user);
-
-  // Calls Cache::reserve() and returns a ready entry future if successful,
-  // else Failure. Claims the space and assigns the entry's size to this
-  // amount if and only if successful.
-  process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
-      const Try<Bytes>& requestedSpace,
-      const std::shared_ptr<Cache::Entry>& entry);
-
-  const Flags flags;
-
-  Cache cache;
-
-  hashmap<ContainerID, pid_t> subprocessPids;
-
-  // NOTE: These metrics will increment at most once per task. Even if
-  // a single task asks for multiple artifacts, the total number of
-  // fetches will only go up by one. And if any of those artifacts
-  // fail to fetch, the failure count will only increase by one.
-  process::metrics::Counter fetchesTotal;
-  process::metrics::Counter fetchesFailed;
-};
-
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/slave/containerizer/fetcher_process.hpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher_process.hpp b/src/slave/containerizer/fetcher_process.hpp
new file mode 100644
index 0000000..0a34230
--- /dev/null
+++ b/src/slave/containerizer/fetcher_process.hpp
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__
+#define __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__
+
+#include <list>
+#include <string>
+
+#include <mesos/mesos.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <process/metrics/counter.hpp>
+
+#include <stout/hashmap.hpp>
+
+#include "slave/flags.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class FetcherProcess : public process::Process<FetcherProcess>
+{
+public:
+  explicit FetcherProcess(const Flags& _flags);
+  virtual ~FetcherProcess();
+
+  process::Future<Nothing> fetch(
+      const ContainerID& containerId,
+      const CommandInfo& commandInfo,
+      const std::string& sandboxDirectory,
+      const Option<std::string>& user);
+
+  // Runs the mesos-fetcher, creating a "stdout" and "stderr" file
+  // in the given directory, using these for trace output.
+  virtual process::Future<Nothing> run(
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const Option<std::string>& user,
+      const mesos::fetcher::FetcherInfo& info);
+
+  // Best effort attempt to kill the external mesos-fetcher process
+  // running on behalf of the given container ID, if any.
+  void kill(const ContainerID& containerId);
+
+  // Representation of the fetcher cache and its contents. There is
+  // exactly one instance per instance of FetcherProcess. All methods
+  // of Cache are to be executed on the latter to ensure atomicity of
+  // cache operations.
+  class Cache
+  {
+  public:
+    class Entry
+    {
+    public:
+      Entry(
+          const std::string& key,
+          const std::string& directory,
+          const std::string& filename)
+        : key(key),
+          directory(directory),
+          filename(filename),
+          size(0),
+          referenceCount(0) {}
+
+      ~Entry() {}
+
+      // Marks this file's download as successful by setting its promise
+      // to the path of the file in the cache.
+      void complete();
+
+      // Indicates whether this file's download into the cache is
+      // successfully completed.
+      process::Future<Nothing> completion();
+
+      // Marks this download as failed, notifying concurrent fetch attempts
+      // waiting for this result, by setting the promise to failed.
+      void fail();
+
+      // While an entry is "referenced" it cannot be evicted from the
+      // cache.
+      void reference();
+      void unreference();
+      bool isReferenced() const;
+
+      // Returns the path in the filesystem where cache entry resides.
+      // TODO(bernd-mesos): Remove this construct after refactoring so
+      // that the slave flags get injected into the fetcher.
+      Path path() const { return Path(path::join(directory, filename)); }
+
+      // Uniquely identifies a user/URI combination.
+      const std::string key;
+
+      // Cache directory where this entry is stored.
+      // TODO(bernd-mesos): Remove this construct after refactoring so
+      // that the slave flags get injected into the fetcher.
+      const std::string directory;
+
+      // The unique name of the file held in the cache on behalf of a
+      // URI.
+      const std::string filename;
+
+      // The expected size of the cache file. This field is set before
+      // downloading. If the actual size of the downloaded file is
+      // different a warning is logged and the field's value adjusted.
+      Bytes size;
+
+    private:
+      // Concurrent fetch attempts can reference the same entry multiple
+      // times.
+      unsigned long referenceCount;
+
+     // Indicates successful downloading to the cache.
+      process::Promise<Nothing> promise;
+    };
+
+    explicit Cache(Bytes _space) : space(_space), tally(0), filenameSerial(0) {}
+    virtual ~Cache() {}
+
+    // Registers the maximum usable space in the cache directory.
+    // TODO(bernd-mesos): This method will disappear when injecting 'flags'
+    // into the fetcher instead of passing 'flags' around as parameter.
+    void setSpace(const Bytes& bytes);
+
+    void claimSpace(const Bytes& bytes);
+    void releaseSpace(const Bytes& bytes);
+    Bytes availableSpace() const;
+
+    // Invents a new, distinct base name for a cache file, using the same
+    // filename extension as the URI.
+    std::string nextFilename(const CommandInfo::URI& uri);
+
+    // Creates a new entry and inserts it into the cache table. Also
+    // sets its reference count to 1. Returns the entry.
+    std::shared_ptr<Entry> create(
+        const std::string& cacheDirectory,
+        const Option<std::string>& user,
+        const CommandInfo::URI& uri);
+
+    // Retrieves the cache entry indexed by the parameters, without
+    // changing its reference count.
+    Option<std::shared_ptr<Entry>> get(
+        const Option<std::string>& user,
+        const std::string& uri);
+
+    // Returns whether an entry for this user and URI is in the cache.
+    bool contains(
+        const Option<std::string>& user, const std::string& uri) const;
+
+    // Returns whether this identical entry is in the cache.
+    bool contains(const std::shared_ptr<Cache::Entry>& entry) const;
+
+    // Completely deletes a cache entry and its file. Warns on failure.
+    // Virtual for mock testing.
+    virtual Try<Nothing> remove(const std::shared_ptr<Entry>& entry);
+
+    // Determines a list of cache entries to remove, respectively cache files
+    // to delete, so that at least the required amount of space would become
+    // available.
+    Try<std::list<std::shared_ptr<Cache::Entry>>>
+        selectVictims(const Bytes& requiredSpace);
+
+    // Ensures that there is the requested amount of space is available
+    // Evicts other files as necessary to make it so.
+    Try<Nothing> reserve(const Bytes& requestedSpace);
+
+    // Finds out if any predictions about cache file sizes have been
+    // inaccurate, logs this if so, and records the cache files' actual
+    // sizes and adjusts the cache's total amount of space in use.
+    Try<Nothing> adjust(const std::shared_ptr<Cache::Entry>& entry);
+
+    // Number of entries.
+    size_t size() const;
+
+  private:
+    // Maximum storable number of bytes in the cache directory.
+    const Bytes space;
+
+    // How much space has been reserved to be occupied by cache files.
+    Bytes tally;
+
+    // Used to generate distinct cache file names simply by counting.
+    unsigned long filenameSerial;
+
+    // Maps keys (cache directory / URI combinations) to cache file
+    // entries.
+    hashmap<std::string, std::shared_ptr<Entry>> table;
+
+    // Stores cache file entries sorted from LRU to MRU.
+    std::list<std::shared_ptr<Entry>> lruSortedEntries;
+  };
+
+  // Public and virtual for mock testing.
+  virtual process::Future<Nothing> _fetch(
+      const hashmap<
+          CommandInfo::URI,
+          Option<process::Future<std::shared_ptr<Cache::Entry>>>>&
+        entries,
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
+      const Option<std::string>& user);
+
+  // Returns a list of cache files on disk for the given slave
+  // (for all users combined). For testing.
+  Try<std::list<Path>> cacheFiles() const;
+
+  // Returns the number of cache entries for the given slave (for all
+  // users combined). For testing.
+  size_t cacheSize() const;
+
+  // Returns the amount of remaining cache space that is not occupied
+  // by cache entries. For testing.
+  Bytes availableCacheSpace() const;
+
+private:
+  process::Future<Nothing> __fetch(
+      const hashmap<CommandInfo::URI,
+      Option<std::shared_ptr<Cache::Entry>>>& entries,
+      const ContainerID& containerId,
+      const std::string& sandboxDirectory,
+      const std::string& cacheDirectory,
+      const Option<std::string>& user);
+
+  // Calls Cache::reserve() and returns a ready entry future if successful,
+  // else Failure. Claims the space and assigns the entry's size to this
+  // amount if and only if successful.
+  process::Future<std::shared_ptr<Cache::Entry>> reserveCacheSpace(
+      const Try<Bytes>& requestedSpace,
+      const std::shared_ptr<Cache::Entry>& entry);
+
+  const Flags flags;
+
+  Cache cache;
+
+  hashmap<ContainerID, pid_t> subprocessPids;
+
+  // NOTE: These metrics will increment at most once per task. Even if
+  // a single task asks for multiple artifacts, the total number of
+  // fetches will only go up by one. And if any of those artifacts
+  // fail to fetch, the failure count will only increase by one.
+  process::metrics::Counter fetchesTotal;
+  process::metrics::Counter fetchesFailed;
+};
+
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_CONTAINERIZER_FETCHER_PROCESS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/4abb33ec/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index eac2c96..06b22f9 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -75,6 +75,7 @@
 
 #include "slave/containerizer/containerizer.hpp"
 #include "slave/containerizer/fetcher.hpp"
+#include "slave/containerizer/fetcher_process.hpp"
 
 #include "slave/containerizer/mesos/containerizer.hpp"