You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@mesos.apache.org by GitBox <gi...@apache.org> on 2018/08/03 20:42:09 UTC

[GitHub] vinodkone closed pull request #65: Process dispatch

vinodkone closed pull request #65: Process dispatch
URL: https://github.com/apache/mesos/pull/65
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/3rdparty/libprocess/include/process/dispatch.hpp b/3rdparty/libprocess/include/process/dispatch.hpp
index c5b8137586..bbfef9df69 100644
--- a/3rdparty/libprocess/include/process/dispatch.hpp
+++ b/3rdparty/libprocess/include/process/dispatch.hpp
@@ -347,6 +347,33 @@ Future<R> dispatch(
 #undef TEMPLATE
 
 
+template <typename I, typename Callable, typename ...Args>
+auto dispatch(
+    const ProcessBase* process,
+    Callable fn,
+    Args&&... args) -> typename std::result_of<decltype(fn)(I*, Args...)>::type
+{
+  typedef typename std::result_of<decltype(fn)(I*, Args...)>::type::value_type result_type; //NOLINT
+  std::shared_ptr<Promise<result_type>> promise(new Promise<result_type>());
+
+  auto call(std::bind(fn, std::placeholders::_1, std::forward<Args>(args)...));
+
+  std::shared_ptr<std::function<void(ProcessBase*)>> f(
+      new std::function<void(ProcessBase*)>(
+      [=](ProcessBase* process) {
+        assert(process != NULL);
+        I* t = dynamic_cast<I*>(process);
+        assert(t != NULL);
+
+        promise->associate(call(t));
+      }));
+
+  internal::dispatch(process->self(), f, &typeid(fn));
+
+  return promise->future();
+}
+
+
 inline void dispatch(
     const UPID& pid,
     const std::function<void()>& f)
diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp
index 9006b8a83d..3c30505a8c 100644
--- a/3rdparty/libprocess/include/process/future.hpp
+++ b/3rdparty/libprocess/include/process/future.hpp
@@ -90,6 +90,8 @@ template <typename T>
 class Future
 {
 public:
+  typedef T value_type;
+
   // Constructs a failed future.
   static Future<T> failed(const std::string& message);
 
diff --git a/src/Makefile.am b/src/Makefile.am
index 8963cea9fd..0707ab5f9b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -706,6 +706,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 	common/date_utils.hpp						\
 	common/http.hpp							\
 	common/parse.hpp						\
+	common/process_dispatcher.hpp					\
 	common/protobuf_utils.hpp					\
 	common/recordio.hpp						\
 	common/resources_utils.hpp					\
diff --git a/src/common/process_dispatcher.hpp b/src/common/process_dispatcher.hpp
new file mode 100644
index 0000000000..192a809849
--- /dev/null
+++ b/src/common/process_dispatcher.hpp
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __PROCESS_DISPATCHER_HPP__
+#define __PROCESS_DISPATCHER_HPP__
+
+#include <memory>
+#include <utility>
+
+#include <stout/try.hpp>
+
+#include <process/dispatch.hpp>
+#include <process/process.hpp>
+#include <process/shared.hpp>
+
+namespace mesos {
+
+template <typename I>
+class Dispatchable
+{
+public:
+  virtual ~Dispatchable(){}
+
+  template <typename Callable, typename ...Args>
+    auto dispatch(Callable fn, Args&&... args)
+    -> typename std::result_of<decltype(fn)(I*, Args...)>::type
+    {
+      ProcessBase* process = dynamic_cast<ProcessBase*>(getInterface());
+      return process::dispatch<I, Callable, Args...>(
+          process,
+          fn,
+          std::forward<Args>(args)...);
+    }
+
+protected:
+  void setInterface(const process::Shared<I>& i)
+  {
+    interface = i;
+  }
+
+  I* getInterface()
+  {
+    return const_cast<I*>(interface.get());
+  }
+
+  process::Shared<I> interface;
+};
+
+template <typename I, typename P = I>
+class ProcessDispatcher : public Dispatchable<I>
+{
+public:
+  template<typename ...Args>
+  static Try<process::Owned<Dispatchable<I>>> create(Args&&... args)
+  {
+    Try<process::Owned<P>> newProcess(P::create(std::forward<Args>(args)...));
+
+    if (newProcess.isError()) {
+      return Error(newProcess.error());
+    }
+
+    process::Shared<I> sharedInterface(newProcess.get().release());
+
+    return process::Owned<Dispatchable<I>>(
+        new ProcessDispatcher(sharedInterface));
+  }
+
+  static Try<process::Owned<ProcessDispatcher>> create(
+      process::Shared<I> process)
+  {
+    return(process::Owned<ProcessDispatcher>(new ProcessDispatcher(process)));
+  }
+
+  ~ProcessDispatcher()
+  {
+    ProcessBase* process = dynamic_cast<ProcessBase*>(this->getInterface());
+
+    terminate(process);
+    process::wait(process);
+  }
+
+private:
+  ProcessDispatcher(const process::Shared<I>& i)
+    :Dispatchable<I>()
+  {
+    this->setInterface(i);
+
+    ProcessBase* process = dynamic_cast<ProcessBase*>(const_cast<I*>(i.get()));
+    if (!process)
+    {
+      assert(false);
+    }
+
+    spawn(process);
+  }
+
+  ProcessDispatcher(const ProcessDispatcher&) = delete;
+};
+
+} // namespace mesos {
+
+#endif // __PROCESS_DISPATCHER_HPP__
diff --git a/src/slave/containerizer/provisioners/docker/token_manager.cpp b/src/slave/containerizer/provisioners/docker/token_manager.cpp
index aec915f25f..1acda505ff 100644
--- a/src/slave/containerizer/provisioners/docker/token_manager.cpp
+++ b/src/slave/containerizer/provisioners/docker/token_manager.cpp
@@ -42,68 +42,6 @@ namespace slave {
 namespace docker {
 namespace registry {
 
-class TokenManagerProcess : public Process<TokenManagerProcess>
-{
-public:
-  static Try<Owned<TokenManagerProcess>> create(const URL& realm);
-
-  Future<Token> getToken(
-      const string& service,
-      const string& scope,
-      const Option<string>& account);
-
-private:
-  static const string TOKEN_PATH_PREFIX;
-  static const Duration RESPONSE_TIMEOUT;
-
-  TokenManagerProcess(const URL& realm)
-    : realm_(realm) {}
-
-  Try<Token> getTokenFromResponse(const Response& response) const;
-
-  /**
-   * Key for the token cache.
-   */
-  struct TokenCacheKey
-  {
-    string service;
-    string scope;
-  };
-
-  struct TokenCacheKeyHash
-  {
-    size_t operator()(const TokenCacheKey& key) const
-    {
-      hash<string> hashFn;
-
-      return (hashFn(key.service) ^
-          (hashFn(key.scope) << 1));
-    }
-  };
-
-  struct TokenCacheKeyEqual
-  {
-    bool operator()(
-        const TokenCacheKey& left,
-        const TokenCacheKey& right) const
-    {
-      return ((left.service == right.service) &&
-          (left.scope == right.scope));
-    }
-  };
-
-  typedef hashmap<
-    const TokenCacheKey,
-    Token,
-    TokenCacheKeyHash,
-    TokenCacheKeyEqual> TokenCacheType;
-
-  const URL realm_;
-  TokenCacheType tokenCache_;
-
-  TokenManagerProcess(const TokenManagerProcess&) = delete;
-  TokenManagerProcess& operator=(const TokenManagerProcess&) = delete;
-};
 
 const Duration TokenManagerProcess::RESPONSE_TIMEOUT = Seconds(10);
 const string TokenManagerProcess::TOKEN_PATH_PREFIX = "/v2/token/";
@@ -233,46 +171,6 @@ bool Token::isValid() const
 }
 
 
-Try<Owned<TokenManager>> TokenManager::create(
-    const URL& realm)
-{
-  Try<Owned<TokenManagerProcess>> process = TokenManagerProcess::create(realm);
-  if (process.isError()) {
-    return Error(process.error());
-  }
-
-  return Owned<TokenManager>(new TokenManager(process.get()));
-}
-
-
-TokenManager::TokenManager(Owned<TokenManagerProcess>& process)
-  : process_(process)
-{
-  spawn(CHECK_NOTNULL(process_.get()));
-}
-
-
-TokenManager::~TokenManager()
-{
-  terminate(process_.get());
-  process::wait(process_.get());
-}
-
-
-Future<Token> TokenManager::getToken(
-    const string& service,
-    const string& scope,
-    const Option<string>& account)
-{
-  return dispatch(
-      process_.get(),
-      &TokenManagerProcess::getToken,
-      service,
-      scope,
-      account);
-}
-
-
 Try<Owned<TokenManagerProcess>> TokenManagerProcess::create(const URL& realm)
 {
   return Owned<TokenManagerProcess>(new TokenManagerProcess(realm));
@@ -308,6 +206,7 @@ Future<Token> TokenManagerProcess::getToken(
     const string& scope,
     const Option<string>& account)
 {
+        std::cout << __FILE__ << ":" << __LINE__ << std::endl;
   const TokenCacheKey tokenKey = {service, scope};
 
   if (tokenCache_.contains(tokenKey)) {
diff --git a/src/slave/containerizer/provisioners/docker/token_manager.hpp b/src/slave/containerizer/provisioners/docker/token_manager.hpp
index 879269dab9..18708d3523 100644
--- a/src/slave/containerizer/provisioners/docker/token_manager.hpp
+++ b/src/slave/containerizer/provisioners/docker/token_manager.hpp
@@ -96,10 +96,6 @@ struct Token
 };
 
 
-// Forward declaration.
-class TokenManagerProcess;
-
-
 /**
  *  Acquires and manages docker registry tokens. It keeps the tokens in its
  *  cache to server any future request for the same token.
@@ -110,19 +106,6 @@ class TokenManagerProcess;
 class TokenManager
 {
 public:
-  /**
-   * Factory method for creating TokenManager object.
-   *
-   * TokenManager and registry authorization realm has a 1:1 relationship.
-   *
-   * @param realm URL of the authorization server from where token will be
-   *     requested by this TokenManager.
-   * @returns Owned<TokenManager> if success.
-   *          Error on failure.
-   */
-  static Try<process::Owned<TokenManager>> create(
-      const process::http::URL& realm);
-
   /**
    * Returns JSON Web Token from cache or from remote server using "Basic
    * authorization".
@@ -136,13 +119,14 @@ class TokenManager
    * @param password base64 encoded password for basic authorization.
    * @returns Token struct that encapsulates JSON Web Token.
    */
-  process::Future<Token> getToken(
+  /*
+  virtual process::Future<Token> getToken(
       const std::string& service,
       const std::string& scope,
       const Option<std::string>& account,
       const std::string& user,
-      const Option<std::string>& password);
-
+      const Option<std::string>& password) = 0;
+*/
   /**
    * Returns JSON Web Token from cache or from remote server using "TLS/Cert"
    * based authorization.
@@ -154,22 +138,82 @@ class TokenManager
    * @param account Name of the account which the client is acting as.
    * @returns Token struct that encapsulates JSON Web Token.
    */
+
+  virtual process::Future<Token> getToken(
+      const std::string& service,
+      const std::string& scope,
+      const Option<std::string>& account) = 0;
+
+  virtual ~TokenManager() {}
+};
+
+
+class TokenManagerProcess :
+  public TokenManager,
+  public process::Process<TokenManagerProcess>
+{
+public:
+  static Try<process::Owned<TokenManagerProcess>> create(
+      const process::http::URL& realm);
+
   process::Future<Token> getToken(
       const std::string& service,
       const std::string& scope,
       const Option<std::string>& account);
 
-  ~TokenManager();
-
 private:
-  TokenManager(process::Owned<TokenManagerProcess>& process);
+  static const std::string TOKEN_PATH_PREFIX;
+  static const Duration RESPONSE_TIMEOUT;
 
-  TokenManager(const TokenManager&) = delete;
-  TokenManager& operator=(const TokenManager&) = delete;
+  TokenManagerProcess(const process::http::URL& realm)
+    : realm_(realm) {}
 
-  process::Owned<TokenManagerProcess> process_;
-};
+  Try<Token> getTokenFromResponse(
+      const process::http::Response& response) const;
 
+  /**
+   * Key for the token cache.
+   */
+  struct TokenCacheKey
+  {
+    std::string service;
+    std::string scope;
+  };
+
+  struct TokenCacheKeyHash
+  {
+    size_t operator()(const TokenCacheKey& key) const
+    {
+      std::hash<std::string> hashFn;
+
+      return (hashFn(key.service) ^
+          (hashFn(key.scope) << 1));
+    }
+  };
+
+  struct TokenCacheKeyEqual
+  {
+    bool operator()(
+        const TokenCacheKey& left,
+        const TokenCacheKey& right) const
+    {
+      return ((left.service == right.service) &&
+          (left.scope == right.scope));
+    }
+  };
+
+  typedef hashmap<
+    const TokenCacheKey,
+    Token,
+    TokenCacheKeyHash,
+    TokenCacheKeyEqual> TokenCacheType;
+
+  const process::http::URL realm_;
+  TokenCacheType tokenCache_;
+
+  TokenManagerProcess(const TokenManagerProcess&) = delete;
+  TokenManagerProcess& operator=(const TokenManagerProcess&) = delete;
+};
 } // namespace registry {
 } // namespace docker {
 } // namespace slave {
diff --git a/src/tests/provisioners/docker_provisioner_tests.cpp b/src/tests/provisioners/docker_provisioner_tests.cpp
index ff29d562c7..cec909d4b0 100644
--- a/src/tests/provisioners/docker_provisioner_tests.cpp
+++ b/src/tests/provisioners/docker_provisioner_tests.cpp
@@ -31,6 +31,7 @@
 
 #include <process/ssl/gtest.hpp>
 
+#include "common/process_dispatcher.hpp"
 #include "slave/containerizer/provisioners/docker/token_manager.hpp"
 
 #include "tests/mesos.hpp"
@@ -234,11 +235,152 @@ TEST_F(DockerRegistryClientTest, SimpleGetToken)
       server.get().address().get().hostname().get(),
       server.get().address().get().port);
 
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
-  ASSERT_SOME(tokenMgr);
+  auto tokenManagerProcess = TokenManagerProcess::create(url);
+  ASSERT_SOME(tokenManagerProcess);
+
+  Shared<TokenManager> tmProcess(tokenManagerProcess.get().release());
+  auto tokenManager =
+    ProcessDispatcher<TokenManager>::create(tmProcess);
+
+  Future<Token> token =
+    tokenManager.get()->dispatch(
+        &TokenManager::getToken,
+        "registry.docker.io",
+        "repository:library/busybox:pull",
+        None());
+
+  AWAIT_ASSERT_READY(socket);
+
+  // Construct response and send(server side).
+  const double expirySecs = Clock::now().secs() + Days(365).secs();
+
+  claimsJsonString =
+    "{\"access\" \
+      :[ \
+        { \
+          \"type\":\"repository\", \
+          \"name\":\"library/busybox\", \
+          \"actions\":[\"pull\"]}], \
+          \"aud\":\"registry.docker.io\", \
+          \"exp\":" + stringify(expirySecs) + ", \
+          \"iat\":1438887168, \
+          \"iss\":\"auth.docker.io\", \
+          \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \
+          \"nbf\":1438887166, \
+          \"sub\":\"\" \
+         }";
+
+  const string tokenString(getTokenString());
+  const string tokenResponse = "{\"token\":\"" + tokenString + "\"}";
+
+  const string buffer =
+    string("HTTP/1.1 200 OK\r\n") +
+    "Content-Length : " +
+    stringify(tokenResponse.length()) + "\r\n" +
+    "\r\n" +
+    tokenResponse;
+
+  AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer));
+
+  AWAIT_ASSERT_READY(token);
+  ASSERT_EQ(token.get().raw, tokenString);
+}
+
+
+TEST_F(DockerRegistryClientTest, TokenManagerInterface)
+{
+  class AnotherTokenManager :
+    public TokenManager,
+    public Process<AnotherTokenManager>
+  {
+  public:
+    static Owned<AnotherTokenManager> create(const string& str)
+    {
+      return Owned<AnotherTokenManager>(new AnotherTokenManager(str));
+    }
+
+    AnotherTokenManager(const string str)
+      :str_(str) {}
+
+    process::Future<Token> getToken(
+        const std::string& service,
+        const std::string& scope,
+        const Option<std::string>& account)
+    {
+      return Failure("AnotherTokenManager");
+    }
+
+  private:
+    const string str_;
+  };
+
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  // Create URL from server hostname and port.
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  vector<Owned<Dispatchable<TokenManager>>> tokenManagerList;
+
+  Try<Owned<Dispatchable<TokenManager>>> tokenManagerProcess1 =
+    ProcessDispatcher<TokenManager, TokenManagerProcess>::create(url);
+  ASSERT_SOME(tokenManagerProcess1);
+
+
+  Try<Owned<Dispatchable<TokenManager>>> tokenManagerProcess2 =
+    ProcessDispatcher<TokenManager, AnotherTokenManager>::create("test");
+
+  ASSERT_SOME(tokenManagerProcess2);
+
+  tokenManagerList.push_back(tokenManagerProcess1.get());
+  tokenManagerList.push_back(tokenManagerProcess2.get());
+
+  foreach (Owned<Dispatchable<TokenManager>>& i, tokenManagerList) {
+      i->dispatch(
+        &TokenManager::getToken,
+        "registry.docker.io",
+        "repository:library/busybox:pull",
+        None());
+  }
+}
+
+
+TEST_F(DockerRegistryClientTest, DispatchOwnsTokenManager)
+{
+  Try<Socket> server = setup_server({
+      {"SSL_ENABLED", "true"},
+      {"SSL_KEY_FILE", key_path().value},
+      {"SSL_CERT_FILE", certificate_path().value}});
+
+  ASSERT_SOME(server);
+  ASSERT_SOME(server.get().address());
+  ASSERT_SOME(server.get().address().get().hostname());
+
+  Future<Socket> socket = server.get().accept();
+
+  // Create URL from server hostname and port.
+  const http::URL url(
+      "https",
+      server.get().address().get().hostname().get(),
+      server.get().address().get().port);
+
+  auto tokenManager =
+    ProcessDispatcher<TokenManager, TokenManagerProcess>::create(url);
 
   Future<Token> token =
-    tokenMgr.get()->getToken(
+    tokenManager.get()->dispatch(
+        &TokenManager::getToken,
         "registry.docker.io",
         "repository:library/busybox:pull",
         None());
@@ -301,11 +443,13 @@ TEST_F(DockerRegistryClientTest, BadTokenResponse)
       server.get().address().get().hostname().get(),
       server.get().address().get().port);
 
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
+  auto tokenMgr =
+    ProcessDispatcher<TokenManager, TokenManagerProcess>::create(url);
   ASSERT_SOME(tokenMgr);
 
   Future<Token> token =
-    tokenMgr.get()->getToken(
+    tokenMgr.get()->dispatch(
+        &TokenManager::getToken,
         "registry.docker.io",
         "repository:library/busybox:pull",
         None());
@@ -334,11 +478,13 @@ TEST_F(DockerRegistryClientTest, BadTokenServerAddress)
   // Create an invalid URL with current time.
   const http::URL url("https", stringify(Clock::now().secs()), 0);
 
-  Try<Owned<TokenManager>> tokenMgr = TokenManager::create(url);
+  auto tokenMgr =
+    ProcessDispatcher<TokenManager, TokenManagerProcess>::create(url);
   ASSERT_SOME(tokenMgr);
 
   Future<Token> token =
-    tokenMgr.get()->getToken(
+    tokenMgr.get()->dispatch(
+        &TokenManager::getToken,
         "registry.docker.io",
         "repository:library/busybox:pull",
         None());
@@ -348,7 +494,6 @@ TEST_F(DockerRegistryClientTest, BadTokenServerAddress)
 
 #endif // USE_SSL_SOCKET
 
-
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services