You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/08/18 18:54:08 UTC

[1/9] mesos git commit: Fixed some style issues.

Repository: mesos
Updated Branches:
  refs/heads/master 4c71ba1bd -> 55678b41f


Fixed some style issues.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/55678b41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/55678b41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/55678b41

Branch: refs/heads/master
Commit: 55678b41fb462effd912d5a310b421928f80f3b7
Parents: 2c3facb
Author: Jie Yu <yu...@gmail.com>
Authored: Fri Aug 18 11:43:27 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 src/resource_provider/driver.cpp          |  2 +-
 src/resource_provider/http_connection.hpp | 65 ++++++++++++++------------
 src/resource_provider/manager.cpp         |  1 -
 3 files changed, 37 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/55678b41/src/resource_provider/driver.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/driver.cpp b/src/resource_provider/driver.cpp
index fdb35d4..62c4ca1 100644
--- a/src/resource_provider/driver.cpp
+++ b/src/resource_provider/driver.cpp
@@ -29,9 +29,9 @@
 #include "resource_provider/http_connection.hpp"
 #include "resource_provider/validation.hpp"
 
-using process::dispatch;
 using process::Future;
 using process::Owned;
+using process::dispatch;
 using process::spawn;
 using process::terminate;
 using process::wait;

http://git-wip-us.apache.org/repos/asf/mesos/blob/55678b41/src/resource_provider/http_connection.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/http_connection.hpp b/src/resource_provider/http_connection.hpp
index bc1f01a..47686a8 100644
--- a/src/resource_provider/http_connection.hpp
+++ b/src/resource_provider/http_connection.hpp
@@ -58,8 +58,8 @@ namespace internal {
 /**
  * HTTP connection handler.
  *
- * Manages the connection to a Call/Event based v1 API like the resource
- * provider API.
+ * Manages the connection to a Call/Event based v1 API like the
+ * resource provider API.
  */
 template <typename Call, typename Event>
 class HttpConnectionProcess
@@ -72,10 +72,10 @@ public:
    * @param prefix prefix of the actor.
    * @param _detector the endpoint detector.
    * @param _contentType the content type expected by this connection.
-   * @param validate a callback which will be invoked when a call needs
-   *     to be validated.
-   * @param connected a callback which will be invoked when the connection
-   *     is established.
+   * @param validate a callback which will be invoked when a call
+   *     needs to be validated.
+   * @param connected a callback which will be invoked when the
+   *     connection is established.
    * @param disconnected a callback which will be invoked when the
    *     connection is disconnected.
    * @param received a callback which will be be invoked when events
@@ -108,17 +108,19 @@ public:
     }
 
     if (call.type() == Call::SUBSCRIBE && state != State::CONNECTED) {
-      // It might be possible that the scheduler is retrying. We drop the
-      // request if we have an ongoing subscribe request in flight or if the
-      // scheduler is already subscribed.
+      // It might be possible that the client is retrying. We drop the
+      // request if we have an ongoing subscribe request in flight or
+      // if the client is already subscribed.
       return process::Failure(
-          "Resource provider is in state" + stringify(state));
+          "Cannot process 'SUBSCRIBE' call as the driver is in "
+          "state " + stringify(state));
     }
 
     if (call.type() != Call::SUBSCRIBE && state != State::SUBSCRIBED) {
       // We drop all non-subscribe calls if we are not currently subscribed.
       return process::Failure(
-          "Resource provider is in state " + stringify(state));
+          "Cannot process '" + stringify(call.type()) + "' call "
+          "as the driver is in state " + stringify(state));
     }
 
     CHECK(state == State::CONNECTED || state == State::SUBSCRIBED);
@@ -151,11 +153,13 @@ public:
     }
 
     CHECK_SOME(connectionId);
-    return response.then(defer(self(),
-                        &Self::_send,
-                        connectionId.get(),
-                        call,
-                        lambda::_1));
+
+    return response.then(
+        defer(self(),
+              &Self::_send,
+              connectionId.get(),
+              call,
+              lambda::_1));
   }
 
 protected:
@@ -181,8 +185,8 @@ protected:
       LOG(WARNING) << "Failed to detect an endpoint: " << future.failure();
 
       // TODO(nfnt): A non-retryable error might be the reason for the
-      // failed future. In that case the resource provider should be
-      // informed about this error and the URL dectection aborted.
+      // failed future. In that case the client should be informed
+      // about this error and the URL dectection aborted.
     }
 
     // Invoke the disconnected callback if we were previously connected.
@@ -240,7 +244,8 @@ protected:
     state = State::CONNECTING;
 
     // We create two persistent connections here, one for subscribe
-    // call/streaming response and another for non-subscribe calls/responses.
+    // call/streaming response and another for non-subscribe
+    // calls/responses.
     collect(
         process::http::connect(endpoint.get()),
         process::http::connect(endpoint.get()))
@@ -291,8 +296,8 @@ protected:
           connectionId.get(),
           "Non-subscribe connection interrupted"));
 
-    // Invoke the connected callback once we have established both subscribe
-    // and non-subscribe connections with the master.
+    // Invoke the connected callback once we have established both
+    // subscribe and non-subscribe connections with the master.
     mutex.lock()
       .then(defer(self(), [this]() {
         return process::async(callbacks.connected);
@@ -328,8 +333,8 @@ protected:
     }
 
     // We can reach here if we noticed a disconnection for either of
-    // subscribe/non-subscribe connections. We discard the future here to
-    // trigger an endpoint re-detection.
+    // subscribe/non-subscribe connections. We discard the future here
+    // to trigger an endpoint re-detection.
     detection.discard();
   }
 
@@ -338,8 +343,8 @@ protected:
       const Call& call,
       const process::http::Response& response)
   {
-    // It is possible that we detected a new endpoint before a response
-    // could be received.
+    // It is possible that we detected a new endpoint before a
+    // response could be received.
     if (connectionId != _connectionId) {
       return process::Failure("Ignoring response from stale connection");
     }
@@ -430,6 +435,7 @@ protected:
     if (event.isFailed()) {
       LOG(ERROR) << "Failed to decode stream of events: "
                  << event.failure();
+
       disconnected(connectionId.get(), event.failure());
       return;
     }
@@ -542,10 +548,11 @@ private:
   process::Owned<EndpointDetector> detector;
   std::queue<Event> events;
 
-  // There can be multiple simulataneous ongoing (re-)connection attempts with
-  // the remote endpoint (e.g., the endpoint failed over while an attempt was
-  // in progress). This helps us in uniquely identifying the current connection
-  // instance and ignoring the stale instance.
+  // There can be multiple simulataneous ongoing (re-)connection
+  // attempts with the remote endpoint (e.g., the endpoint failed over
+  // while an attempt was in progress). This helps us in uniquely
+  // identifying the current connection instance and ignoring the
+  // stale instance.
   Option<UUID> connectionId;
   Option<UUID> streamId;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/55678b41/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 7072c36..da9dff1 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -252,7 +252,6 @@ Future<http::Response> ResourceProviderManagerProcess::api(
     return ok;
   }
 
-
   if (!resourceProviders.contains(call.resource_provider_id())) {
     return BadRequest("Resource provider cannot be found");
   }


[3/9] mesos git commit: Implemented HTTP connection handling for the resource provider driver.

Posted by ji...@apache.org.
Implemented HTTP connection handling for the resource provider driver.

Similar to the existing HTTP connection handling of schedulers and
executors, the resource provider driver will create two connections
with the resource provider manager, one for streaming events and another
one for sending calls. This connection handling has been generalized as
a 'HttpConnectionProcess' and can be reused in other cases.

Review: https://reviews.apache.org/r/61271/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/96adbc8b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/96adbc8b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/96adbc8b

Branch: refs/heads/master
Commit: 96adbc8bb7edea5f735041cc13c4e3f700aceab8
Parents: 8b38fae
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Fri Aug 18 09:59:39 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 include/mesos/v1/resource_provider.hpp     |  26 +-
 src/CMakeLists.txt                         |   1 +
 src/Makefile.am                            |   3 +
 src/resource_provider/detector.cpp         |  41 ++
 src/resource_provider/detector.hpp         |  51 +++
 src/resource_provider/driver.cpp           |  72 +--
 src/resource_provider/http_connection.hpp  | 558 ++++++++++++++++++++++++
 src/resource_provider/storage/provider.cpp |   7 +-
 8 files changed, 717 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/96adbc8b/include/mesos/v1/resource_provider.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resource_provider.hpp b/include/mesos/v1/resource_provider.hpp
index 91946eb..006889a 100644
--- a/include/mesos/v1/resource_provider.hpp
+++ b/include/mesos/v1/resource_provider.hpp
@@ -20,19 +20,34 @@
 #include <functional>
 #include <queue>
 
+#include <process/future.hpp>
 #include <process/http.hpp>
 #include <process/owned.hpp>
 
 #include <mesos/http.hpp>
 
+#include <mesos/v1/mesos.hpp>
+
 #include <mesos/v1/resource_provider/resource_provider.hpp>
 
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+
 namespace mesos {
+namespace internal {
+
+// Forward declarations.
+template <typename Call, typename Event>
+class HttpConnectionProcess;
+
+class EndpointDetector;
+
+} // namespace internal {
+
 namespace v1 {
 namespace resource_provider {
 
-// Forward declarations.
-class DriverProcess;
+typedef ::mesos::internal::HttpConnectionProcess<Call, Event> DriverProcess;
 
 
 /**
@@ -60,18 +75,19 @@ public:
    * @param received a callback which will be invoked when the driver
    *     receives resource provider Events.
    */
-  Driver(const process::http::URL& url,
+  Driver(process::Owned<mesos::internal::EndpointDetector> detector,
          ContentType contentType,
          const std::function<void(void)>& connected,
          const std::function<void(void)>& disconnected,
-         const std::function<void(const std::queue<Event>&)>& received);
+         const std::function<void(const std::queue<Event>&)>& received,
+         const Option<Credential>& credential);
 
   ~Driver();
 
   Driver(const Driver& other) = delete;
   Driver& operator=(const Driver& other) = delete;
 
-  void send(const Call& call) {}
+  process::Future<Nothing> send(const Call& call);
 
 private:
   process::Owned<DriverProcess> process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/96adbc8b/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 98ccaf4..0816c6e 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -400,6 +400,7 @@ set(POSIX_SRC
 
 set(RESOURCE_PROVIDER_SRC
   resource_provider/daemon.cpp
+  resource_provider/detector.cpp
   resource_provider/driver.cpp
   resource_provider/local.cpp
   resource_provider/manager.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/96adbc8b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 68fff14..3db208a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -967,6 +967,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   oci/spec.cpp								\
   posix/rlimits.cpp							\
   resource_provider/daemon.cpp						\
+  resource_provider/detector.cpp					\
   resource_provider/driver.cpp						\
   resource_provider/local.cpp						\
   resource_provider/manager.cpp						\
@@ -1106,6 +1107,8 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   module/manager.hpp							\
   posix/rlimits.hpp							\
   resource_provider/daemon.hpp						\
+  resource_provider/detector.hpp					\
+  resource_provider/http_connection.hpp					\
   resource_provider/local.hpp						\
   resource_provider/manager.hpp						\
   resource_provider/message.hpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/96adbc8b/src/resource_provider/detector.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/detector.cpp b/src/resource_provider/detector.cpp
new file mode 100644
index 0000000..59f2b9b
--- /dev/null
+++ b/src/resource_provider/detector.cpp
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "resource_provider/detector.hpp"
+
+namespace http = process::http;
+
+using process::Future;
+
+namespace mesos {
+namespace internal {
+
+ConstantEndpointDetector::ConstantEndpointDetector(const http::URL& _url)
+  : url(_url) {}
+
+
+Future<Option<http::URL>> ConstantEndpointDetector::detect(
+    const Option<http::URL>& previous)
+{
+  if (previous.isNone() || stringify(previous.get()) != stringify(url)) {
+    return url;
+  } else {
+    return Future<Option<http::URL>>(); // A pending future.
+  }
+}
+
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/96adbc8b/src/resource_provider/detector.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/detector.hpp b/src/resource_provider/detector.hpp
new file mode 100644
index 0000000..68ea8cf
--- /dev/null
+++ b/src/resource_provider/detector.hpp
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_DETECTOR_HPP__
+#define __RESOURCE_PROVIDER_DETECTOR_HPP__
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+
+namespace mesos {
+namespace internal {
+
+class EndpointDetector
+{
+public:
+  virtual ~EndpointDetector() {}
+
+  virtual process::Future<Option<process::http::URL>> detect(
+      const Option<process::http::URL>& previous) = 0;
+};
+
+
+class ConstantEndpointDetector : public EndpointDetector
+{
+public:
+  explicit ConstantEndpointDetector(const process::http::URL& url);
+
+  process::Future<Option<process::http::URL>> detect(
+      const Option<process::http::URL>& previous) override;
+
+private:
+  const process::http::URL& url;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_DETECTOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/96adbc8b/src/resource_provider/driver.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/driver.cpp b/src/resource_provider/driver.cpp
index 2fc4e68..fdb35d4 100644
--- a/src/resource_provider/driver.cpp
+++ b/src/resource_provider/driver.cpp
@@ -14,62 +14,58 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <glog/logging.h>
+#include <mesos/v1/resource_provider.hpp>
+
+#include <string>
+#include <utility>
 
 #include <process/dispatch.hpp>
-#include <process/id.hpp>
+#include <process/http.hpp>
 #include <process/process.hpp>
 
-#include <mesos/v1/resource_provider.hpp>
+#include "internal/devolve.hpp"
 
-using std::function;
-using std::queue;
+#include "resource_provider/detector.hpp"
+#include "resource_provider/http_connection.hpp"
+#include "resource_provider/validation.hpp"
 
+using process::dispatch;
+using process::Future;
 using process::Owned;
-using process::Process;
-using process::ProcessBase;
-
 using process::spawn;
 using process::terminate;
 using process::wait;
 
-namespace mesos {
-namespace v1 {
-namespace resource_provider {
+using std::function;
+using std::string;
+using std::queue;
 
-class DriverProcess : public Process<DriverProcess>
+namespace {
+
+Option<Error> validate(const mesos::v1::resource_provider::Call& call)
 {
-public:
-  DriverProcess(
-      ContentType _contentType,
-      const function<void(void)>& connected,
-      const function<void(void)>& disconnected,
-      const function<void(const queue<Event>&)>& received)
-    : ProcessBase(process::ID::generate("resource-provider-driver")),
-      contentType(_contentType),
-      callbacks {connected, disconnected, received} {}
-
-protected:
-  struct Callbacks
-  {
-    function<void(void)> connected;
-    function<void(void)> disconnected;
-    function<void(const queue<Event>&)> received;
-  };
-
-  const ContentType contentType;
-  const Callbacks callbacks;
-};
+  return mesos::internal::resource_provider::validation::call::validate(
+      mesos::internal::devolve(call));
+}
+
+} // namespace {
 
+namespace mesos {
+namespace v1 {
+namespace resource_provider {
 
 Driver::Driver(
-    const process::http::URL& url,
+    Owned<mesos::internal::EndpointDetector> detector,
     ContentType contentType,
     const function<void(void)>& connected,
     const function<void(void)>& disconnected,
-    const function<void(const std::queue<Event>&)>& received)
+    const function<void(const queue<Event>&)>& received,
+    const Option<Credential>& credential)
   : process(new DriverProcess(
+        "resource-provider-driver",
+        std::move(detector),
         contentType,
+        validate,
         connected,
         disconnected,
         received))
@@ -84,6 +80,12 @@ Driver::~Driver()
   wait(process.get());
 }
 
+
+Future<Nothing> Driver::send(const Call& call)
+{
+  return dispatch(process.get(), &DriverProcess::send, call);
+}
+
 } // namespace resource_provider {
 } // namespace v1 {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/96adbc8b/src/resource_provider/http_connection.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/http_connection.hpp b/src/resource_provider/http_connection.hpp
new file mode 100644
index 0000000..bc1f01a
--- /dev/null
+++ b/src/resource_provider/http_connection.hpp
@@ -0,0 +1,558 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
+#define __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__
+
+#include <glog/logging.h>
+
+#include <functional>
+#include <ostream>
+#include <string>
+#include <tuple>
+#include <queue>
+#include <utility>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/mesos.hpp>
+
+#include <process/async.hpp>
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/mutex.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/nothing.hpp>
+#include <stout/recordio.hpp>
+#include <stout/result.hpp>
+#include <stout/unreachable.hpp>
+#include <stout/uuid.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
+#include "resource_provider/detector.hpp"
+
+namespace mesos {
+namespace internal {
+
+/**
+ * HTTP connection handler.
+ *
+ * Manages the connection to a Call/Event based v1 API like the resource
+ * provider API.
+ */
+template <typename Call, typename Event>
+class HttpConnectionProcess
+  : public process::Process<HttpConnectionProcess<Call, Event>>
+{
+public:
+  /**
+   * Construct a HTTP connection process.
+   *
+   * @param prefix prefix of the actor.
+   * @param _detector the endpoint detector.
+   * @param _contentType the content type expected by this connection.
+   * @param validate a callback which will be invoked when a call needs
+   *     to be validated.
+   * @param connected a callback which will be invoked when the connection
+   *     is established.
+   * @param disconnected a callback which will be invoked when the
+   *     connection is disconnected.
+   * @param received a callback which will be be invoked when events
+   *     are received.
+   */
+  HttpConnectionProcess(
+      const std::string& prefix,
+      process::Owned<EndpointDetector> _detector,
+      ContentType _contentType,
+      const std::function<Option<Error>(const Call&)>& validate,
+      const std::function<void(void)>& connected,
+      const std::function<void(void)>& disconnected,
+      const std::function<void(const std::queue<Event>&)>& received)
+    : process::ProcessBase(process::ID::generate(prefix)),
+      state(State::DISCONNECTED),
+      contentType(_contentType),
+      callbacks {validate, connected, disconnected, received},
+      detector(std::move(_detector)) {}
+
+  process::Future<Nothing> send(const Call& call)
+  {
+    Option<Error> error = callbacks.validate(call);
+
+    if (error.isSome()) {
+      return process::Failure(error->message);
+    }
+
+    if (endpoint.isNone()) {
+      return process::Failure("Not connected to an endpoint");
+    }
+
+    if (call.type() == Call::SUBSCRIBE && state != State::CONNECTED) {
+      // It might be possible that the scheduler is retrying. We drop the
+      // request if we have an ongoing subscribe request in flight or if the
+      // scheduler is already subscribed.
+      return process::Failure(
+          "Resource provider is in state" + stringify(state));
+    }
+
+    if (call.type() != Call::SUBSCRIBE && state != State::SUBSCRIBED) {
+      // We drop all non-subscribe calls if we are not currently subscribed.
+      return process::Failure(
+          "Resource provider is in state " + stringify(state));
+    }
+
+    CHECK(state == State::CONNECTED || state == State::SUBSCRIBED);
+    CHECK_SOME(connections);
+
+    VLOG(1) << "Sending " << call.type() << " call to " << endpoint.get();
+
+    process::http::Request request;
+    request.method = "POST";
+    request.url = endpoint.get();
+    request.body = serialize(contentType, call);
+    request.keepAlive = true;
+    request.headers = {{"Accept", stringify(contentType)},
+                       {"Content-Type", stringify(contentType)}};
+
+    process::Future<process::http::Response> response;
+    if (call.type() == Call::SUBSCRIBE) {
+      CHECK_EQ(State::CONNECTED, state);
+      state = State::SUBSCRIBING;
+
+      // Send a streaming request for Subscribe call.
+      response = connections->subscribe.send(request, true);
+    } else {
+      if (streamId.isSome()) {
+        // Set the stream ID associated with this connection.
+        request.headers["Mesos-Stream-Id"] = streamId->toString();
+      }
+
+      response = connections->nonSubscribe.send(request);
+    }
+
+    CHECK_SOME(connectionId);
+    return response.then(defer(self(),
+                        &Self::_send,
+                        connectionId.get(),
+                        call,
+                        lambda::_1));
+  }
+
+protected:
+  // Because we're deriving from a templated base class, we have
+  // to explicitly bring these hidden base class names into scope.
+  using process::Process<HttpConnectionProcess<Call, Event>>::self;
+  typedef HttpConnectionProcess<Call, Event> Self;
+
+  void initialize() override
+  {
+    detection = detector->detect(None())
+      .onAny(defer(self(), &Self::detected, lambda::_1));
+  }
+
+  void finalize() override
+  {
+    disconnect();
+  }
+
+  void detected(const process::Future<Option<process::http::URL>>& future)
+  {
+    if (future.isFailed()) {
+      LOG(WARNING) << "Failed to detect an endpoint: " << future.failure();
+
+      // TODO(nfnt): A non-retryable error might be the reason for the
+      // failed future. In that case the resource provider should be
+      // informed about this error and the URL dectection aborted.
+    }
+
+    // Invoke the disconnected callback if we were previously connected.
+    switch (state) {
+      case State::CONNECTING:
+      case State::DISCONNECTED:
+        break;
+      case State::CONNECTED:
+      case State::SUBSCRIBING:
+      case State::SUBSCRIBED: {
+        mutex.lock()
+          .then(defer(self(), [this]() {
+            return process::async(callbacks.disconnected);
+          }))
+          .onAny(lambda::bind(&process::Mutex::unlock, mutex));
+      }
+    }
+
+    disconnect();
+
+    if (future.isDiscarded()) {
+      LOG(INFO) << "Re-detecting endpoint";
+
+      endpoint = None();
+    } else if (future->isNone()) {
+      LOG(INFO) << "Lost endpoint";
+
+      endpoint = None();
+    } else {
+      endpoint = future.get().get();
+
+      LOG(INFO) << "New endpoint detected at " << endpoint.get();
+
+      connectionId = UUID::random();
+
+      dispatch(self(), &Self::connect, connectionId.get());
+    }
+
+    detection = detector->detect(endpoint)
+      .onAny(defer(self(), &Self::detected, lambda::_1));
+  }
+
+  void connect(const UUID& _connectionId)
+  {
+    // It is possible that a new endpoint was detected while we were
+    // waiting to establish a connection with the old master.
+    if (connectionId != _connectionId) {
+      VLOG(1) << "Ignoring connection attempt from stale connection";
+      return;
+    }
+
+    CHECK_SOME(endpoint);
+    CHECK_EQ(State::DISCONNECTED, state);
+
+    state = State::CONNECTING;
+
+    // We create two persistent connections here, one for subscribe
+    // call/streaming response and another for non-subscribe calls/responses.
+    collect(
+        process::http::connect(endpoint.get()),
+        process::http::connect(endpoint.get()))
+      .onAny(defer(self(), &Self::connected, connectionId.get(), lambda::_1));
+  }
+
+  void connected(
+      const UUID& _connectionId,
+      const process::Future<std::tuple<
+        process::http::Connection, process::http::Connection>>& _connections)
+  {
+    // It is possible that a new endpoint was detected while we had an
+    // ongoing (re-)connection attempt with the old endpoint.
+    if (connectionId != _connectionId) {
+      VLOG(1) << "Ignoring connection attempt from stale connection";
+      return;
+    }
+
+    CHECK_EQ(State::CONNECTING, state);
+
+    if (!_connections.isReady()) {
+      disconnected(connectionId.get(),
+                   _connections.isFailed()
+                     ? _connections.failure()
+                     : "Connection future discarded");
+      return;
+    }
+
+    VLOG(1) << "Connected with the remote endpoint at " << endpoint.get();
+
+    state = State::CONNECTED;
+
+    connections = Connections {
+        std::get<0>(_connections.get()),
+        std::get<1>(_connections.get())};
+
+    connections->subscribe.disconnected()
+      .onAny(defer(
+          self(),
+          &Self::disconnected,
+          connectionId.get(),
+          "Subscribe connection interrupted"));
+
+    connections->nonSubscribe.disconnected()
+      .onAny(defer(
+          self(),
+          &Self::disconnected,
+          connectionId.get(),
+          "Non-subscribe connection interrupted"));
+
+    // Invoke the connected callback once we have established both subscribe
+    // and non-subscribe connections with the master.
+    mutex.lock()
+      .then(defer(self(), [this]() {
+        return process::async(callbacks.connected);
+      }))
+      .onAny(lambda::bind(&process::Mutex::unlock, mutex));
+  }
+
+  void disconnect()
+  {
+    if (connections.isSome()) {
+      connections->subscribe.disconnect();
+      connections->nonSubscribe.disconnect();
+    }
+
+    if (subscribed.isSome()) {
+      subscribed->reader.close();
+    }
+
+    state = State::DISCONNECTED;
+
+    connections = None();
+    subscribed = None();
+    endpoint = None();
+    connectionId = None();
+  }
+
+  void disconnected(const UUID& _connectionId, const std::string& failure)
+  {
+    // Ignore if the disconnection happened from an old stale connection.
+    if (connectionId != _connectionId) {
+      VLOG(1) << "Ignoring disconnection attempt from stale connection";
+      return;
+    }
+
+    // We can reach here if we noticed a disconnection for either of
+    // subscribe/non-subscribe connections. We discard the future here to
+    // trigger an endpoint re-detection.
+    detection.discard();
+  }
+
+  process::Future<Nothing> _send(
+      const UUID& _connectionId,
+      const Call& call,
+      const process::http::Response& response)
+  {
+    // It is possible that we detected a new endpoint before a response
+    // could be received.
+    if (connectionId != _connectionId) {
+      return process::Failure("Ignoring response from stale connection");
+    }
+
+    CHECK(state == State::SUBSCRIBING || state == State::SUBSCRIBED) << state;
+
+    if (response.code == process::http::Status::OK) {
+      // Only SUBSCRIBE call should get a "200 OK" response.
+      CHECK_EQ(Call::SUBSCRIBE, call.type());
+      CHECK_EQ(process::http::Response::PIPE, response.type);
+      CHECK_SOME(response.reader);
+
+      state = State::SUBSCRIBED;
+
+      process::http::Pipe::Reader reader = response.reader.get();
+
+      auto deserializer =
+        lambda::bind(deserialize<Event>, contentType, lambda::_1);
+
+      process::Owned<recordio::Reader<Event>> decoder(
+          new recordio::Reader<Event>(
+              ::recordio::Decoder<Event>(deserializer),
+              reader));
+
+      subscribed = SubscribedResponse(reader, std::move(decoder));
+
+      if (response.headers.contains("Mesos-Stream-Id")) {
+        Try<UUID> uuid =
+          UUID::fromString(response.headers.at("Mesos-Stream-Id"));
+
+        CHECK_SOME(uuid);
+
+        streamId = uuid.get();
+      }
+
+      read();
+
+      return Nothing();
+    }
+
+    if (response.code == process::http::Status::ACCEPTED) {
+      // Only non SUBSCRIBE calls should get a "202 Accepted" response.
+      CHECK_NE(Call::SUBSCRIBE, call.type());
+      return Nothing();
+    }
+
+    // We reset the state to connected if the subscribe call did not
+    // succceed. We can then retry the subscribe call.
+    if (call.type() == Call::SUBSCRIBE) {
+      state = State::CONNECTED;
+    }
+
+    if (response.code == process::http::Status::SERVICE_UNAVAILABLE ||
+        response.code == process::http::Status::NOT_FOUND) {
+      return process::Failure(
+          "Received '" + response.status + "' (" + response.body + ")");
+    }
+
+    return process::Failure(
+        "Received unexpected '" + response.status +
+        "' (" + response.body + ")");
+  }
+
+  void read()
+  {
+    subscribed->decoder->read()
+      .onAny(defer(self(),
+                   &Self::_read,
+                   subscribed->reader,
+                   lambda::_1));
+  }
+
+  void _read(
+      const process::http::Pipe::Reader& reader,
+      const process::Future<Result<Event>>& event)
+  {
+    CHECK(!event.isDiscarded());
+
+    // Ignore enqueued events from the previous Subscribe call reader.
+    if (!subscribed.isSome() || subscribed->reader != reader) {
+      VLOG(1) << "Ignoring event from old stale connection";
+      return;
+    }
+
+    CHECK_EQ(State::SUBSCRIBED, state);
+    CHECK_SOME(connectionId);
+
+    if (event.isFailed()) {
+      LOG(ERROR) << "Failed to decode stream of events: "
+                 << event.failure();
+      disconnected(connectionId.get(), event.failure());
+      return;
+    }
+
+    if (event->isNone()) {
+      const std::string error = "End-Of-File received";
+      LOG(ERROR) << error;
+
+      disconnected(connectionId.get(), error);
+      return;
+    }
+
+    if (event->isError()) {
+      LOG(ERROR) << "Failed to de-serialize event: " << event->error();
+    } else {
+      receive(event.get().get());
+    }
+
+    read();
+  }
+
+  void receive(const Event& event)
+  {
+    // Check if we're are no longer subscribed but received an event.
+    if (state != State::SUBSCRIBED) {
+      LOG(WARNING) << "Ignoring " << stringify(event.type())
+                   << " event because we're no longer subscribed";
+      return;
+    }
+
+    // Queue up the event and invoke the 'received' callback if this
+    // is the first event (between now and when the 'received'
+    // callback actually gets invoked more events might get queued).
+    events.push(event);
+
+    if (events.size() == 1) {
+      mutex.lock()
+        .then(defer(self(), [this]() {
+          process::Future<Nothing> future =
+            process::async(callbacks.received, events);
+          events = std::queue<Event>();
+          return future;
+        }))
+        .onAny(lambda::bind(&process::Mutex::unlock, mutex));
+    }
+  }
+
+private:
+  struct Callbacks
+  {
+    std::function<Option<Error>(const Call&)> validate;
+    std::function<void(void)> connected;
+    std::function<void(void)> disconnected;
+    std::function<void(const std::queue<Event>&)> received;
+  };
+
+  struct Connections
+  {
+    process::http::Connection subscribe;
+    process::http::Connection nonSubscribe;
+  };
+
+  struct SubscribedResponse
+  {
+    SubscribedResponse(
+        process::http::Pipe::Reader _reader,
+        process::Owned<recordio::Reader<Event>> _decoder)
+      : reader(std::move(_reader)),
+        decoder(std::move(_decoder)) {}
+
+    // The decoder cannot be copied meaningfully, see MESOS-5122.
+    SubscribedResponse(const SubscribedResponse&) = delete;
+    SubscribedResponse& operator=(const SubscribedResponse&) = delete;
+    SubscribedResponse& operator=(SubscribedResponse&&) = default;
+    SubscribedResponse(SubscribedResponse&&) = default;
+
+    process::http::Pipe::Reader reader;
+    process::Owned<recordio::Reader<Event>> decoder;
+  };
+
+  enum class State
+  {
+    DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
+    CONNECTING, // Trying to establish subscribe and non-subscribe connections.
+    CONNECTED, // Established subscribe and non-subscribe connections.
+    SUBSCRIBING, // Trying to subscribe with the remote endpoint.
+    SUBSCRIBED // Subscribed with the remote endpoint.
+  };
+
+  friend std::ostream& operator<<(std::ostream& stream, State state)
+  {
+    switch (state) {
+      case State::DISCONNECTED: return stream << "DISCONNECTED";
+      case State::CONNECTING:   return stream << "CONNECTING";
+      case State::CONNECTED:    return stream << "CONNECTED";
+      case State::SUBSCRIBING:  return stream << "SUBSCRIBING";
+      case State::SUBSCRIBED:   return stream << "SUBSCRIBED";
+    }
+
+    UNREACHABLE();
+  }
+
+  State state;
+  Option<Connections> connections;
+  Option<SubscribedResponse> subscribed;
+  Option<process::http::URL> endpoint;
+  const mesos::ContentType contentType;
+  const Callbacks callbacks;
+  process::Mutex mutex; // Used to serialize the callback invocations.
+  process::Owned<EndpointDetector> detector;
+  std::queue<Event> events;
+
+  // There can be multiple simulataneous ongoing (re-)connection attempts with
+  // the remote endpoint (e.g., the endpoint failed over while an attempt was
+  // in progress). This helps us in uniquely identifying the current connection
+  // instance and ignoring the stale instance.
+  Option<UUID> connectionId;
+  Option<UUID> streamId;
+
+  process::Future<Option<process::http::URL>> detection;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_HTTP_CONNECTION_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/96adbc8b/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 0893ace..4622499 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -28,6 +28,8 @@
 
 #include "internal/devolve.hpp"
 
+#include "resource_provider/detector.hpp"
+
 using std::queue;
 
 using process::Owned;
@@ -113,7 +115,7 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 void StorageLocalResourceProviderProcess::initialize()
 {
   driver.reset(new Driver(
-      url,
+      Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
       contentType,
       defer(self(), &Self::connected),
       defer(self(), &Self::disconnected),
@@ -123,7 +125,8 @@ void StorageLocalResourceProviderProcess::initialize()
           received(devolve(event));
           events.pop();
         }
-      })));
+      }),
+      None())); // TODO(nfnt): Add authentication as part of MESOS-7854.
 }
 
 


[4/9] mesos git commit: Added a URL parameter to the resource provider driver.

Posted by ji...@apache.org.
Added a URL parameter to the resource provider driver.

The URL will be used by the resource provider driver to determine
the endpoint of the resource provider manager API.

Review: https://reviews.apache.org/r/61275/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9f39112b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9f39112b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9f39112b

Branch: refs/heads/master
Commit: 9f39112b4df3d56f905fdd457e7bfdf9e294a911
Parents: 4c71ba1
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Fri Aug 18 09:59:19 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 include/mesos/v1/resource_provider.hpp     |  5 ++++-
 src/resource_provider/daemon.cpp           | 10 ++++++++--
 src/resource_provider/daemon.hpp           |  3 +++
 src/resource_provider/driver.cpp           |  1 +
 src/resource_provider/local.cpp            |  3 ++-
 src/resource_provider/local.hpp            |  2 ++
 src/resource_provider/storage/provider.cpp | 10 ++++++++--
 src/resource_provider/storage/provider.hpp |  3 +++
 src/slave/slave.cpp                        | 18 +++++++++++++++++-
 9 files changed, 48 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/include/mesos/v1/resource_provider.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resource_provider.hpp b/include/mesos/v1/resource_provider.hpp
index 88b6062..91946eb 100644
--- a/include/mesos/v1/resource_provider.hpp
+++ b/include/mesos/v1/resource_provider.hpp
@@ -20,6 +20,7 @@
 #include <functional>
 #include <queue>
 
+#include <process/http.hpp>
 #include <process/owned.hpp>
 
 #include <mesos/http.hpp>
@@ -50,6 +51,7 @@ public:
    * the resource provider manager. Note that we drop events while
    * disconnected.
    *
+   * @param url the URL where the resource provider API is served.
    * @param contentType the content type expected by this driver.
    * @param connected a callback which will be invoked when the driver
    *     is connected.
@@ -58,7 +60,8 @@ public:
    * @param received a callback which will be invoked when the driver
    *     receives resource provider Events.
    */
-  Driver(ContentType contentType,
+  Driver(const process::http::URL& url,
+         ContentType contentType,
          const std::function<void(void)>& connected,
          const std::function<void(void)>& disconnected,
          const std::function<void(const std::queue<Event>&)>& received);

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/src/resource_provider/daemon.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.cpp b/src/resource_provider/daemon.cpp
index adcb60a..d584eb9 100644
--- a/src/resource_provider/daemon.cpp
+++ b/src/resource_provider/daemon.cpp
@@ -55,9 +55,11 @@ class LocalResourceProviderDaemonProcess
 {
 public:
   LocalResourceProviderDaemonProcess(
+      const process::http::URL& _url,
       const string& _workDir,
       const Option<string>& _configDir)
     : ProcessBase(process::ID::generate("local-resource-provider-daemon")),
+      url(_url),
       workDir(_workDir),
       configDir(_configDir) {}
 
@@ -78,6 +80,7 @@ private:
 
   Try<Nothing> load(const string& path);
 
+  const process::http::URL url;
   const string workDir;
   const Option<string> configDir;
 
@@ -144,7 +147,7 @@ Try<Nothing> LocalResourceProviderDaemonProcess::load(const string& path)
   }
 
   Try<Owned<LocalResourceProvider>> provider =
-    LocalResourceProvider::create(info.get());
+    LocalResourceProvider::create(url, info.get());
 
   if (provider.isError()) {
     return Error(
@@ -159,6 +162,7 @@ Try<Nothing> LocalResourceProviderDaemonProcess::load(const string& path)
 
 
 Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
+    const process::http::URL& url,
     const slave::Flags& flags)
 {
   // We require that the config directory exists to create a daemon.
@@ -168,15 +172,17 @@ Try<Owned<LocalResourceProviderDaemon>> LocalResourceProviderDaemon::create(
   }
 
   return new LocalResourceProviderDaemon(
+      url,
       flags.work_dir,
       configDir);
 }
 
 
 LocalResourceProviderDaemon::LocalResourceProviderDaemon(
+    const process::http::URL& url,
     const string& workDir,
     const Option<string>& configDir)
-  : process(new LocalResourceProviderDaemonProcess(workDir, configDir))
+  : process(new LocalResourceProviderDaemonProcess(url, workDir, configDir))
 {
   spawn(CHECK_NOTNULL(process.get()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/src/resource_provider/daemon.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/daemon.hpp b/src/resource_provider/daemon.hpp
index 467a5d3..ef6c356 100644
--- a/src/resource_provider/daemon.hpp
+++ b/src/resource_provider/daemon.hpp
@@ -19,6 +19,7 @@
 
 #include <string>
 
+#include <process/http.hpp>
 #include <process/owned.hpp>
 
 #include <stout/option.hpp>
@@ -40,6 +41,7 @@ class LocalResourceProviderDaemon
 {
 public:
   static Try<process::Owned<LocalResourceProviderDaemon>> create(
+      const process::http::URL& url,
       const slave::Flags& flags);
 
   ~LocalResourceProviderDaemon();
@@ -52,6 +54,7 @@ public:
 
 private:
   LocalResourceProviderDaemon(
+      const process::http::URL& url,
       const std::string& workDir,
       const Option<std::string>& configDir);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/src/resource_provider/driver.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/driver.cpp b/src/resource_provider/driver.cpp
index 6778ec9..2fc4e68 100644
--- a/src/resource_provider/driver.cpp
+++ b/src/resource_provider/driver.cpp
@@ -63,6 +63,7 @@ protected:
 
 
 Driver::Driver(
+    const process::http::URL& url,
     ContentType contentType,
     const function<void(void)>& connected,
     const function<void(void)>& disconnected,

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/src/resource_provider/local.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/local.cpp b/src/resource_provider/local.cpp
index a57c7c9..ad98f33 100644
--- a/src/resource_provider/local.cpp
+++ b/src/resource_provider/local.cpp
@@ -24,12 +24,13 @@ namespace mesos {
 namespace internal {
 
 Try<Owned<LocalResourceProvider>> LocalResourceProvider::create(
+    const process::http::URL& url,
     const ResourceProviderInfo& info)
 {
   // TODO(jieyu): Document the built-in local resource providers.
   if (info.type() == "org.apache.mesos.rp.local.storage") {
     Try<Owned<LocalResourceProvider>> provider =
-      StorageLocalResourceProvider::create(info);
+      StorageLocalResourceProvider::create(url, info);
 
     if (provider.isError()) {
       return Error(

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/src/resource_provider/local.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/local.hpp b/src/resource_provider/local.hpp
index 604c5d0..ebaa07d 100644
--- a/src/resource_provider/local.hpp
+++ b/src/resource_provider/local.hpp
@@ -17,6 +17,7 @@
 #ifndef __RESOURCE_PROVIDER_LOCAL_HPP__
 #define __RESOURCE_PROVIDER_LOCAL_HPP__
 
+#include <process/http.hpp>
 #include <process/owned.hpp>
 
 #include <stout/try.hpp>
@@ -30,6 +31,7 @@ class LocalResourceProvider
 {
 public:
   static Try<process::Owned<LocalResourceProvider>> create(
+      const process::http::URL& url,
       const ResourceProviderInfo& info);
 
   virtual ~LocalResourceProvider() = default;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 4c39312..0893ace 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -52,8 +52,10 @@ class StorageLocalResourceProviderProcess
 {
 public:
   explicit StorageLocalResourceProviderProcess(
+      const process::http::URL& _url,
       const ResourceProviderInfo& _info)
     : ProcessBase(process::ID::generate("storage-local-resource-provider")),
+      url(_url),
       contentType(ContentType::PROTOBUF),
       info(_info) {}
 
@@ -70,6 +72,7 @@ public:
 private:
   void initialize() override;
 
+  const process::http::URL url;
   const ContentType contentType;
   ResourceProviderInfo info;
   Owned<v1::resource_provider::Driver> driver;
@@ -110,6 +113,7 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 void StorageLocalResourceProviderProcess::initialize()
 {
   driver.reset(new Driver(
+      url,
       contentType,
       defer(self(), &Self::connected),
       defer(self(), &Self::disconnected),
@@ -124,16 +128,18 @@ void StorageLocalResourceProviderProcess::initialize()
 
 
 Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
+    const process::http::URL& url,
     const ResourceProviderInfo& info)
 {
   return Owned<LocalResourceProvider>(
-      new StorageLocalResourceProvider(info));
+      new StorageLocalResourceProvider(url, info));
 }
 
 
 StorageLocalResourceProvider::StorageLocalResourceProvider(
+    const process::http::URL& url,
     const ResourceProviderInfo& info)
-  : process(new StorageLocalResourceProviderProcess(info))
+  : process(new StorageLocalResourceProviderProcess(url, info))
 {
   spawn(CHECK_NOTNULL(process.get()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/src/resource_provider/storage/provider.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.hpp b/src/resource_provider/storage/provider.hpp
index c6ea440..6de88c2 100644
--- a/src/resource_provider/storage/provider.hpp
+++ b/src/resource_provider/storage/provider.hpp
@@ -17,6 +17,7 @@
 #ifndef __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__
 #define __RESOURCE_PROVIDER_STORAGE_PROVIDER_HPP__
 
+#include <process/http.hpp>
 #include <process/owned.hpp>
 
 #include <stout/try.hpp>
@@ -36,6 +37,7 @@ class StorageLocalResourceProvider : public LocalResourceProvider
 {
 public:
   static Try<process::Owned<LocalResourceProvider>> create(
+      const process::http::URL& url,
       const mesos::ResourceProviderInfo& info);
 
   ~StorageLocalResourceProvider() override;
@@ -48,6 +50,7 @@ public:
 
 private:
   explicit StorageLocalResourceProvider(
+      const process::http::URL& url,
       const mesos::ResourceProviderInfo& info);
 
   process::Owned<StorageLocalResourceProviderProcess> process;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9f39112b/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5a45054..50d2a10 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -49,6 +49,8 @@
 #include <process/reap.hpp>
 #include <process/time.hpp>
 
+#include <process/ssl/flags.hpp>
+
 #include <stout/bytes.hpp>
 #include <stout/check.hpp>
 #include <stout/duration.hpp>
@@ -413,8 +415,22 @@ void Slave::initialize()
       << mkdir.error();
   }
 
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  process::http::URL localResourceProviderURL(
+      scheme,
+      self().address.ip,
+      self().address.port,
+      self().id + "/api/v1/resource_provider");
+
   Try<Owned<LocalResourceProviderDaemon>> _localResourceProviderDaemon =
-    LocalResourceProviderDaemon::create(flags);
+    LocalResourceProviderDaemon::create(localResourceProviderURL, flags);
 
   if (_localResourceProviderDaemon.isError()) {
     EXIT(EXIT_FAILURE)


[9/9] mesos git commit: Added a resource providers total resources to the subscribe call.

Posted by ji...@apache.org.
Added a resource providers total resources to the subscribe call.

Since resource provider resources are dynamic (as opposed to how e.g.,
agent total resources are implemented), they are not part of the
'ResourceProviderInfo'. Instead they are communicated explicitly.

This commit adds total resources the resource provider 'SUBSCRIBE'
call which can be used to communicate the total capacity in both
subscription and resubscription scenarios.

Review: https://reviews.apache.org/r/61180/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f19cc636
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f19cc636
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f19cc636

Branch: refs/heads/master
Commit: f19cc636305f2542e260b908e55dcffcf03ab228
Parents: f6d6ef4
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Fri Aug 18 10:00:06 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 include/mesos/resource_provider/resource_provider.proto    | 6 ++++++
 include/mesos/v1/resource_provider/resource_provider.proto | 1 +
 2 files changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f19cc636/include/mesos/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/resource_provider/resource_provider.proto b/include/mesos/resource_provider/resource_provider.proto
index 498d848..f5a9073 100644
--- a/include/mesos/resource_provider/resource_provider.proto
+++ b/include/mesos/resource_provider/resource_provider.proto
@@ -67,8 +67,14 @@ message Call {
   }
 
   // Request to subscribe with the master.
+  //
+  // TODO(bbannier): Once we have implemented a call to update a
+  // resource provider, consider removing resources here and instead
+  // moving to a protocol where a resource provider first subscribes
+  // and then updates its resources.
   message Subscribe {
     required ResourceProviderInfo resource_provider_info = 1;
+    repeated Resource resources = 2;
   }
 
   // Notify the master about the status of an operation.

http://git-wip-us.apache.org/repos/asf/mesos/blob/f19cc636/include/mesos/v1/resource_provider/resource_provider.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/resource_provider/resource_provider.proto b/include/mesos/v1/resource_provider/resource_provider.proto
index 59adbc4..e5cbede 100644
--- a/include/mesos/v1/resource_provider/resource_provider.proto
+++ b/include/mesos/v1/resource_provider/resource_provider.proto
@@ -69,6 +69,7 @@ message Call {
   // Request to subscribe with the master.
   message Subscribe {
     required ResourceProviderInfo resource_provider_info = 1;
+    repeated Resource resources = 2;
   }
 
   // Notify the master about the status of an operation.


[6/9] mesos git commit: Added a stream ID to resource provider manager connections.

Posted by ji...@apache.org.
Added a stream ID to resource provider manager connections.

To be able to resolve problems with resource providers that erroneously
try to 'SUBSCRIBE' multiple times a unique stream ID is associated with
a connected resource provider. Resource providers are expected to send
this stream ID with every call that isn't a 'SUBSCRIBE' call.

Review: https://reviews.apache.org/r/61654/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8b38faea
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8b38faea
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8b38faea

Branch: refs/heads/master
Commit: 8b38faeacb25ca97c26782dfbff648eecc32f2a5
Parents: 9f39112
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Fri Aug 18 09:59:32 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp | 89 +++++++++++++++++++++++-----------
 1 file changed, 62 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8b38faea/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 44e1576..df5ddf3 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -83,9 +83,11 @@ namespace internal {
 struct HttpConnection
 {
   HttpConnection(const http::Pipe::Writer& _writer,
-                 ContentType _contentType)
+                 ContentType _contentType,
+                 UUID _streamId)
     : writer(_writer),
       contentType(_contentType),
+      streamId(_streamId),
       encoder(lambda::bind(serialize, contentType, lambda::_1)) {}
 
   // Converts the message to an Event before sending.
@@ -109,6 +111,7 @@ struct HttpConnection
 
   http::Pipe::Writer writer;
   ContentType contentType;
+  UUID streamId;
   ::recordio::Encoder<v1::resource_provider::Event> encoder;
 };
 
@@ -208,15 +211,62 @@ Future<http::Response> ResourceProviderManagerProcess::api(
         "Failed to validate resource_provider::Call: " + error->message);
   }
 
-  ContentType acceptType;
-  if (request.acceptsMediaType(APPLICATION_JSON)) {
-    acceptType = ContentType::JSON;
-  } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
-    acceptType = ContentType::PROTOBUF;
-  } else {
-    return NotAcceptable(
-        string("Expecting 'Accept' to allow ") +
-        "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+  if (call.type() == Call::SUBSCRIBE) {
+    // We default to JSON 'Content-Type' in the response since an empty
+    // 'Accept' header results in all media types considered acceptable.
+    ContentType acceptType = ContentType::JSON;
+
+    if (request.acceptsMediaType(APPLICATION_JSON)) {
+      acceptType = ContentType::JSON;
+    } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+      acceptType = ContentType::PROTOBUF;
+    } else {
+      return NotAcceptable(
+          string("Expecting 'Accept' to allow ") +
+          "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+    }
+
+    if (request.headers.contains("Mesos-Stream-Id")) {
+      return BadRequest(
+          "Subscribe calls should not include the 'Mesos-Stream-Id' header");
+    }
+
+    Pipe pipe;
+    OK ok;
+
+    ok.headers["Content-Type"] = stringify(acceptType);
+    ok.type = http::Response::PIPE;
+    ok.reader = pipe.reader();
+
+    // Generate a stream ID and return it in the response.
+    UUID streamId = UUID::random();
+    ok.headers["Mesos-Stream-Id"] = streamId.toString();
+
+    HttpConnection http(pipe.writer(), acceptType, streamId);
+    subscribe(http, call.subscribe());
+
+    return ok;
+  }
+
+
+  if (!resourceProviders.contains(call.resource_provider_id())) {
+    return BadRequest("Resource provider cannot be found");
+  }
+
+  auto resourceProvider = resourceProviders.at(call.resource_provider_id());
+
+  // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
+  if (!request.headers.contains("Mesos-Stream-Id")) {
+    return BadRequest(
+        "All non-subscribe calls should include to 'Mesos-Stream-Id' header");
+  }
+
+  const string& streamId = request.headers.at("Mesos-Stream-Id");
+  if (streamId != resourceProvider.http.streamId.toString()) {
+    return BadRequest(
+        "The stream ID '" + streamId + "' included in this request "
+        "didn't match the stream ID currently associated with "
+        " resource provider ID " + resourceProvider.info.id().value());
   }
 
   switch(call.type()) {
@@ -225,26 +275,11 @@ Future<http::Response> ResourceProviderManagerProcess::api(
     }
 
     case Call::SUBSCRIBE: {
-      Pipe pipe;
-      OK ok;
-
-      ok.headers["Content-Type"] = stringify(acceptType);
-      ok.type = http::Response::PIPE;
-      ok.reader = pipe.reader();
-
-      HttpConnection http(pipe.writer(), acceptType);
-      subscribe(http, call.subscribe());
-
-      return ok;
+      // `SUBSCRIBE` call should have been handled above.
+      LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
     }
 
     case Call::UPDATE: {
-      if (!resourceProviders.contains(call.resource_provider_id())) {
-        return BadRequest("Resource provider cannot be found");
-      }
-
-      auto resourceProvider = resourceProviders.at(call.resource_provider_id());
-
       update(&resourceProvider, call.update());
       return Accepted();
     }


[7/9] mesos git commit: Added 'devolve' overload for 'ResourceProviderID'.

Posted by ji...@apache.org.
Added 'devolve' overload for 'ResourceProviderID'.

This commit adds the default overload to 'devolve'
'ResourceProviderID's.

Review: https://reviews.apache.org/r/61179/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f6d6ef46
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f6d6ef46
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f6d6ef46

Branch: refs/heads/master
Commit: f6d6ef46b9cf67b48b73afbc5c142d74b517395a
Parents: 7ec6d80
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Fri Aug 18 10:00:01 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 src/internal/devolve.cpp | 6 ++++++
 src/internal/devolve.hpp | 1 +
 2 files changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f6d6ef46/src/internal/devolve.cpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 7eb58e9..e05fcd4 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -117,6 +117,12 @@ Resources devolve(const v1::Resources& resources)
 }
 
 
+ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId)
+{
+  return devolve<ResourceProviderID>(resourceProviderId);
+}
+
+
 SlaveID devolve(const v1::AgentID& agentId)
 {
   // NOTE: Not using 'devolve<v1::AgentID, SlaveID>(agentId)' since

http://git-wip-us.apache.org/repos/asf/mesos/blob/f6d6ef46/src/internal/devolve.hpp
----------------------------------------------------------------------
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 656173d..48d9c33 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -62,6 +62,7 @@ InverseOffer devolve(const v1::InverseOffer& inverseOffer);
 Offer devolve(const v1::Offer& offer);
 Resource devolve(const v1::Resource& resource);
 Resources devolve(const v1::Resources& resources);
+ResourceProviderID devolve(const v1::ResourceProviderID& resourceProviderId);
 SlaveID devolve(const v1::AgentID& agentId);
 SlaveInfo devolve(const v1::AgentInfo& agentInfo);
 TaskID devolve(const v1::TaskID& taskId);


[8/9] mesos git commit: Moved and renamed resource provider manager-related tests.

Posted by ji...@apache.org.
Moved and renamed resource provider manager-related tests.

This patch moves the existing `resource_provider_http_api_tests` file
to a more general name. We also rename the existing HTTP test. This
will allow us to collect most resource provider manager-related unit
tests in this file in the future.

Review: https://reviews.apache.org/r/61278/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2c3facbb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2c3facbb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2c3facbb

Branch: refs/heads/master
Commit: 2c3facbb62cda6ac4d5266a2677b1347c8cf0fd5
Parents: f9ffc24
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Fri Aug 18 10:00:25 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 src/Makefile.am                                |   2 +-
 src/tests/CMakeLists.txt                       |   2 +-
 src/tests/resource_provider_http_api_tests.cpp | 299 --------------------
 src/tests/resource_provider_manager_tests.cpp  | 299 ++++++++++++++++++++
 4 files changed, 301 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2c3facbb/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3db208a..38167a9 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2376,7 +2376,7 @@ mesos_tests_SOURCES =						\
   tests/reservation_endpoints_tests.cpp				\
   tests/reservation_tests.cpp					\
   tests/resource_offers_tests.cpp				\
-  tests/resource_provider_http_api_tests.cpp			\
+  tests/resource_provider_manager_tests.cpp			\
   tests/resource_provider_validation_tests.cpp			\
   tests/resources_tests.cpp					\
   tests/resources_utils.cpp					\

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c3facbb/src/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt
index 6dd2716..6c05ea9 100644
--- a/src/tests/CMakeLists.txt
+++ b/src/tests/CMakeLists.txt
@@ -115,7 +115,7 @@ set(MESOS_TESTS_SRC
   protobuf_io_tests.cpp
   rate_limiting_tests.cpp
   resource_offers_tests.cpp
-  resource_provider_http_api_tests.cpp
+  resource_provider_manager_tests.cpp
   resource_provider_validation_tests.cpp
   resources_tests.cpp
   role_tests.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c3facbb/src/tests/resource_provider_http_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_http_api_tests.cpp b/src/tests/resource_provider_http_api_tests.cpp
deleted file mode 100644
index 85906ea..0000000
--- a/src/tests/resource_provider_http_api_tests.cpp
+++ /dev/null
@@ -1,299 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include <string>
-
-#include <mesos/v1/resource_provider/resource_provider.hpp>
-
-#include <process/clock.hpp>
-#include <process/gmock.hpp>
-#include <process/http.hpp>
-
-#include <stout/lambda.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/recordio.hpp>
-#include <stout/stringify.hpp>
-
-#include "common/http.hpp"
-#include "common/recordio.hpp"
-
-#include "slave/slave.hpp"
-
-#include "resource_provider/manager.hpp"
-
-#include "tests/mesos.hpp"
-
-namespace http = process::http;
-
-using mesos::internal::slave::Slave;
-
-using mesos::master::detector::MasterDetector;
-
-using mesos::v1::resource_provider::Call;
-using mesos::v1::resource_provider::Event;
-
-using process::Clock;
-using process::Future;
-using process::Owned;
-
-using process::http::BadRequest;
-using process::http::OK;
-using process::http::UnsupportedMediaType;
-
-using std::string;
-
-using testing::Values;
-using testing::WithParamInterface;
-
-namespace mesos {
-namespace internal {
-namespace tests {
-
-class ResourceProviderHttpApiTest
-  : public MesosTest,
-    public WithParamInterface<ContentType> {};
-
-
-// The tests are parameterized by the content type of the request.
-INSTANTIATE_TEST_CASE_P(
-    ContentType,
-    ResourceProviderHttpApiTest,
-    Values(ContentType::PROTOBUF, ContentType::JSON));
-
-
-TEST_F(ResourceProviderHttpApiTest, NoContentType)
-{
-  http::Request request;
-  request.method = "POST";
-  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-
-  ResourceProviderManager manager;
-
-  Future<http::Response> response = manager.api(request, None());
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(
-      "Expecting 'Content-Type' to be present",
-      response);
-}
-
-
-// This test sends a valid JSON blob that cannot be deserialized
-// into a valid protobuf resulting in a BadRequest.
-TEST_F(ResourceProviderHttpApiTest, ValidJsonButInvalidProtobuf)
-{
-  JSON::Object object;
-  object.values["string"] = "valid_json";
-
-  http::Request request;
-  request.method = "POST";
-  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-  request.headers["Accept"] = APPLICATION_JSON;
-  request.headers["Content-Type"] = APPLICATION_JSON;
-  request.body = stringify(object);
-
-  ResourceProviderManager manager;
-
-  Future<http::Response> response = manager.api(request, None());
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-  AWAIT_EXPECT_RESPONSE_BODY_EQ(
-      "Failed to validate resource_provider::Call: "
-      "Expecting 'type' to be present",
-      response);
-}
-
-
-TEST_P(ResourceProviderHttpApiTest, MalformedContent)
-{
-  const ContentType contentType = GetParam();
-
-  http::Request request;
-  request.method = "POST";
-  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-  request.headers["Accept"] = stringify(contentType);
-  request.headers["Content-Type"] = stringify(contentType);
-  request.body = "MALFORMED_CONTENT";
-
-  ResourceProviderManager manager;
-
-  Future<http::Response> response = manager.api(request, None());
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
-  switch (contentType) {
-    case ContentType::PROTOBUF:
-      AWAIT_EXPECT_RESPONSE_BODY_EQ(
-          "Failed to parse body into Call protobuf",
-          response);
-      break;
-    case ContentType::JSON:
-      AWAIT_EXPECT_RESPONSE_BODY_EQ(
-          "Failed to parse body into JSON: "
-          "syntax error at line 1 near: MALFORMED_CONTENT",
-          response);
-      break;
-    case ContentType::RECORDIO:
-      break;
-  }
-}
-
-
-TEST_P(ResourceProviderHttpApiTest, UnsupportedContentMediaType)
-{
-  Call call;
-  call.set_type(Call::SUBSCRIBE);
-
-  Call::Subscribe* subscribe = call.mutable_subscribe();
-
-  mesos::v1::ResourceProviderInfo* info =
-    subscribe->mutable_resource_provider_info();
-
-  info->set_type("org.apache.mesos.rp.test");
-  info->set_name("test");
-
-  const ContentType contentType = GetParam();
-  const string unknownMediaType = "application/unknown-media-type";
-
-  http::Request request;
-  request.method = "POST";
-  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-  request.headers["Accept"] = stringify(contentType);
-  request.headers["Content-Type"] = unknownMediaType;
-  request.body = serialize(contentType, call);
-
-  ResourceProviderManager manager;
-
-  Future<http::Response> response = manager.api(request, None());
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response)
-    << response->body;
-}
-
-
-TEST_P(ResourceProviderHttpApiTest, Subscribe)
-{
-  Call call;
-  call.set_type(Call::SUBSCRIBE);
-
-  Call::Subscribe* subscribe = call.mutable_subscribe();
-
-  mesos::v1::ResourceProviderInfo* info =
-    subscribe->mutable_resource_provider_info();
-
-  info->set_type("org.apache.mesos.rp.test");
-  info->set_name("test");
-
-  const ContentType contentType = GetParam();
-
-  http::Request request;
-  request.method = "POST";
-  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-  request.headers["Accept"] = stringify(contentType);
-  request.headers["Content-Type"] = stringify(contentType);
-  request.body = serialize(contentType, call);
-
-  ResourceProviderManager manager;
-
-  Future<http::Response> response = manager.api(request, None());
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response) << response->body;
-  ASSERT_EQ(http::Response::PIPE, response->type);
-
-  Option<http::Pipe::Reader> reader = response->reader;
-  ASSERT_SOME(reader);
-
-  recordio::Reader<Event> responseDecoder(
-      ::recordio::Decoder<Event>(
-          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
-      reader.get());
-
-  Future<Result<Event>> event = responseDecoder.read();
-  AWAIT_READY(event);
-  ASSERT_SOME(event.get());
-
-  // Check event type is subscribed and the resource provider id is set.
-  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
-  ASSERT_NE("", event->get().subscribed().provider_id().value());
-}
-
-
-// This test starts an agent and connects directly with its resource
-// provider endpoint.
-TEST_P(ResourceProviderHttpApiTest, AgentEndpoint)
-{
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
-  ASSERT_SOME(agent);
-
-  AWAIT_READY(__recover);
-
-  // Wait for recovery to be complete.
-  Clock::pause();
-  Clock::settle();
-
-  Call call;
-  call.set_type(Call::SUBSCRIBE);
-
-  Call::Subscribe* subscribe = call.mutable_subscribe();
-
-  mesos::v1::ResourceProviderInfo* info =
-    subscribe->mutable_resource_provider_info();
-
-  info->set_type("org.apache.mesos.rp.test");
-  info->set_name("test");
-
-  const ContentType contentType = GetParam();
-
-  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
-  headers["Accept"] = stringify(contentType);
-
-  Future<http::Response> response = http::streaming::post(
-      agent.get()->pid,
-      "api/v1/resource_provider",
-      headers,
-      serialize(contentType, call),
-      stringify(contentType));
-
-  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
-  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
-  ASSERT_EQ(http::Response::PIPE, response->type);
-
-  Option<http::Pipe::Reader> reader = response->reader;
-  ASSERT_SOME(reader);
-
-  recordio::Reader<Event> responseDecoder(
-      ::recordio::Decoder<Event>(
-          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
-      reader.get());
-
-  Future<Result<Event>> event = responseDecoder.read();
-  AWAIT_READY(event);
-  ASSERT_SOME(event.get());
-
-  // Check event type is subscribed and the resource provider id is set.
-  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
-  ASSERT_NE("", event->get().subscribed().provider_id().value());
-}
-
-} // namespace tests {
-} // namespace internal {
-} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2c3facbb/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
new file mode 100644
index 0000000..83a1340
--- /dev/null
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <string>
+
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
+#include <process/clock.hpp>
+#include <process/gmock.hpp>
+#include <process/http.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/recordio.hpp>
+#include <stout/stringify.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
+#include "slave/slave.hpp"
+
+#include "resource_provider/manager.hpp"
+
+#include "tests/mesos.hpp"
+
+namespace http = process::http;
+
+using mesos::internal::slave::Slave;
+
+using mesos::master::detector::MasterDetector;
+
+using mesos::v1::resource_provider::Call;
+using mesos::v1::resource_provider::Event;
+
+using process::Clock;
+using process::Future;
+using process::Owned;
+
+using process::http::BadRequest;
+using process::http::OK;
+using process::http::UnsupportedMediaType;
+
+using std::string;
+
+using testing::Values;
+using testing::WithParamInterface;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class ResourceProviderManagerHttpApiTest
+  : public MesosTest,
+    public WithParamInterface<ContentType> {};
+
+
+// The tests are parameterized by the content type of the request.
+INSTANTIATE_TEST_CASE_P(
+    ContentType,
+    ResourceProviderManagerHttpApiTest,
+    Values(ContentType::PROTOBUF, ContentType::JSON));
+
+
+TEST_F(ResourceProviderManagerHttpApiTest, NoContentType)
+{
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(
+      "Expecting 'Content-Type' to be present",
+      response);
+}
+
+
+// This test sends a valid JSON blob that cannot be deserialized
+// into a valid protobuf resulting in a BadRequest.
+TEST_F(ResourceProviderManagerHttpApiTest, ValidJsonButInvalidProtobuf)
+{
+  JSON::Object object;
+  object.values["string"] = "valid_json";
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = APPLICATION_JSON;
+  request.headers["Content-Type"] = APPLICATION_JSON;
+  request.body = stringify(object);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  AWAIT_EXPECT_RESPONSE_BODY_EQ(
+      "Failed to validate resource_provider::Call: "
+      "Expecting 'type' to be present",
+      response);
+}
+
+
+TEST_P(ResourceProviderManagerHttpApiTest, MalformedContent)
+{
+  const ContentType contentType = GetParam();
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = stringify(contentType);
+  request.body = "MALFORMED_CONTENT";
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(BadRequest().status, response);
+  switch (contentType) {
+    case ContentType::PROTOBUF:
+      AWAIT_EXPECT_RESPONSE_BODY_EQ(
+          "Failed to parse body into Call protobuf",
+          response);
+      break;
+    case ContentType::JSON:
+      AWAIT_EXPECT_RESPONSE_BODY_EQ(
+          "Failed to parse body into JSON: "
+          "syntax error at line 1 near: MALFORMED_CONTENT",
+          response);
+      break;
+    case ContentType::RECORDIO:
+      break;
+  }
+}
+
+
+TEST_P(ResourceProviderManagerHttpApiTest, UnsupportedContentMediaType)
+{
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  mesos::v1::ResourceProviderInfo* info =
+    subscribe->mutable_resource_provider_info();
+
+  info->set_type("org.apache.mesos.rp.test");
+  info->set_name("test");
+
+  const ContentType contentType = GetParam();
+  const string unknownMediaType = "application/unknown-media-type";
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = unknownMediaType;
+  request.body = serialize(contentType, call);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(UnsupportedMediaType().status, response)
+    << response->body;
+}
+
+
+TEST_P(ResourceProviderManagerHttpApiTest, Subscribe)
+{
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  mesos::v1::ResourceProviderInfo* info =
+    subscribe->mutable_resource_provider_info();
+
+  info->set_type("org.apache.mesos.rp.test");
+  info->set_name("test");
+
+  const ContentType contentType = GetParam();
+
+  http::Request request;
+  request.method = "POST";
+  request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  request.headers["Accept"] = stringify(contentType);
+  request.headers["Content-Type"] = stringify(contentType);
+  request.body = serialize(contentType, call);
+
+  ResourceProviderManager manager;
+
+  Future<http::Response> response = manager.api(request, None());
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response) << response->body;
+  ASSERT_EQ(http::Response::PIPE, response->type);
+
+  Option<http::Pipe::Reader> reader = response->reader;
+  ASSERT_SOME(reader);
+
+  recordio::Reader<Event> responseDecoder(
+      ::recordio::Decoder<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+      reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the resource provider id is set.
+  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+  ASSERT_NE("", event->get().subscribed().provider_id().value());
+}
+
+
+// This test starts an agent and connects directly with its resource
+// provider endpoint.
+TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
+  ASSERT_SOME(agent);
+
+  AWAIT_READY(__recover);
+
+  // Wait for recovery to be complete.
+  Clock::pause();
+  Clock::settle();
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+
+  mesos::v1::ResourceProviderInfo* info =
+    subscribe->mutable_resource_provider_info();
+
+  info->set_type("org.apache.mesos.rp.test");
+  info->set_name("test");
+
+  const ContentType contentType = GetParam();
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> response = http::streaming::post(
+      agent.get()->pid,
+      "api/v1/resource_provider",
+      headers,
+      serialize(contentType, call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response->type);
+
+  Option<http::Pipe::Reader> reader = response->reader;
+  ASSERT_SOME(reader);
+
+  recordio::Reader<Event> responseDecoder(
+      ::recordio::Decoder<Event>(
+          lambda::bind(deserialize<Event>, contentType, lambda::_1)),
+      reader.get());
+
+  Future<Result<Event>> event = responseDecoder.read();
+  AWAIT_READY(event);
+  ASSERT_SOME(event.get());
+
+  // Check event type is subscribed and the resource provider id is set.
+  ASSERT_EQ(Event::SUBSCRIBED, event->get().type());
+  ASSERT_NE("", event->get().subscribed().provider_id().value());
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {


[5/9] mesos git commit: Added a MockResourceProvider.

Posted by ji...@apache.org.
Added a MockResourceProvider.

Review: https://reviews.apache.org/r/61272/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7ec6d80d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7ec6d80d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7ec6d80d

Branch: refs/heads/master
Commit: 7ec6d80dda3662ba2d01455aeac8df0b3a6235f3
Parents: 96adbc8
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Fri Aug 18 09:59:56 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 src/tests/mesos.hpp | 81 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 81 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7ec6d80d/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 6f06261..f80e5fb 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -28,6 +28,7 @@
 
 #include <mesos/v1/executor.hpp>
 #include <mesos/v1/resources.hpp>
+#include <mesos/v1/resource_provider.hpp>
 #include <mesos/v1/scheduler.hpp>
 
 #include <mesos/v1/executor/executor.hpp>
@@ -70,6 +71,8 @@
 
 #include "sched/constants.hpp"
 
+#include "resource_provider/detector.hpp"
+
 #include "slave/constants.hpp"
 #include "slave/slave.hpp"
 
@@ -2248,6 +2251,84 @@ using MockHTTPExecutor = tests::executor::MockHTTPExecutor<
 } // namespace v1 {
 
 
+namespace resource_provider {
+
+template <typename Event, typename Driver>
+class MockResourceProvider
+{
+public:
+  MOCK_METHOD0_T(connected, void());
+  MOCK_METHOD0_T(disconnected, void());
+  MOCK_METHOD1_T(subscribed, void(const typename Event::Subscribed&));
+  MOCK_METHOD1_T(operation, void(const typename Event::Operation&));
+
+  void events(std::queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED:
+          subscribed(event.subscribed());
+          break;
+        case Event::OPERATION:
+          operation(event.operation());
+          break;
+        case Event::UNKNOWN:
+          LOG(FATAL) << "Received unexpected UNKNOWN event";
+          break;
+      }
+    }
+  }
+
+  template <typename Call>
+  process::Future<Nothing> send(const Call& call)
+  {
+    return driver->send(call);
+  }
+
+  template <typename Credential>
+  void start(
+      process::Owned<mesos::internal::EndpointDetector> detector,
+      ContentType contentType,
+      const Credential& credential)
+  {
+    driver.reset(new Driver(
+            std::move(detector),
+            contentType,
+            lambda::bind(&MockResourceProvider<Event, Driver>::connected, this),
+            lambda::bind(
+                &MockResourceProvider<Event, Driver>::disconnected, this),
+            lambda::bind(
+                &MockResourceProvider<Event, Driver>::events, this, lambda::_1),
+            credential));
+  }
+
+private:
+  std::unique_ptr<Driver> driver;
+};
+
+} // namespace resource_provider {
+
+
+namespace v1 {
+namespace resource_provider {
+
+// Alias existing `mesos::v1::resource_provider` classes so that we can easily
+// write `v1::resource_provider::` in tests.
+using Call = mesos::v1::resource_provider::Call;
+using Event = mesos::v1::resource_provider::Event;
+
+} // namespace resource_provider {
+
+using MockResourceProvider = tests::resource_provider::MockResourceProvider<
+    mesos::v1::resource_provider::Event,
+    mesos::v1::resource_provider::Driver>;
+
+} // namespace v1 {
+
+
 // Definition of a MockAuthorizer that can be used in tests with gmock.
 class MockAuthorizer : public Authorizer
 {


[2/9] mesos git commit: Stored subscribed resources in resource provider manager.

Posted by ji...@apache.org.
Stored subscribed resources in resource provider manager.

In order to be able to always communicate the aggregated total
resources available on all subscribed resource providers, a resource
provider manager needs to keep track of the resources of all
subscribed resource providers. This commit adds a field for that the
manager's internal data structures for that purpose.

To make assigned 'ResourceProviderID's opaque to users of managers, the
manager assigns provider ids to all resources added.

Review: https://reviews.apache.org/r/61181/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f9ffc24a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f9ffc24a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f9ffc24a

Branch: refs/heads/master
Commit: f9ffc24aefee64ef91b50ec48258a15bb99600e5
Parents: f19cc63
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Fri Aug 18 10:00:10 2017 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Aug 18 11:43:56 2017 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f9ffc24a/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index df5ddf3..7072c36 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -120,11 +120,15 @@ struct ResourceProvider
 {
   ResourceProvider(
       const ResourceProviderInfo& _info,
-      const HttpConnection& _http)
-    : info(_info), http(_http) {}
+      const HttpConnection& _http,
+      const Resources& _resources)
+    : info(_info),
+      http(_http),
+      resources(_resources) {}
 
   ResourceProviderInfo info;
   HttpConnection http;
+  Resources resources;
 };
 
 
@@ -297,7 +301,14 @@ void ResourceProviderManagerProcess::subscribe(
     subscribe.resource_provider_info();
   resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
 
-  ResourceProvider resourceProvider(resourceProviderInfo, http);
+  // Inject the `ResourceProviderID` for all subscribed resources.
+  Resources resources;
+  foreach (Resource resource, subscribe.resources()) {
+    resource.mutable_provider_id()->CopyFrom(resourceProviderInfo.id());
+    resources += resource;
+  }
+
+  ResourceProvider resourceProvider(resourceProviderInfo, http, resources);
 
   Event event;
   event.set_type(Event::SUBSCRIBED);