You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/01/29 21:14:37 UTC

[mesos] 05/08: Added a forwarding mode to the test CSI plugin.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f8d9a9334fe9c838310319a3bcd35be9d3b3fb5f
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Thu Jan 17 21:40:45 2019 -0800

    Added a forwarding mode to the test CSI plugin.
    
    If the `--forward` flag is set, the test CSI plugin would forward all gRPC
    requests to the specified gRPC server URI, and return the response to
    the caller. This can be used to forward all CSI calls to a mock CSI
    plugin object in the test process.
    
    Review: https://reviews.apache.org/r/69787
---
 src/examples/test_csi_plugin.cpp | 279 ++++++++++++++++++++++++++++++++++-----
 1 file changed, 249 insertions(+), 30 deletions(-)

diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index af18303..73a6c43 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -14,17 +14,22 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <memory>
+#include <thread>
+#include <utility>
+
 #include <csi/spec.hpp>
 
-#include <grpcpp/server.h>
-#include <grpcpp/server_builder.h>
-#include <grpcpp/server_context.h>
-#include <grpcpp/security/server_credentials.h>
+#include <grpcpp/grpcpp.h>
+
+#include <grpcpp/generic/async_generic_service.h>
+#include <grpcpp/generic/generic_stub.h>
 
 #include <mesos/type_utils.hpp>
 
 #include <stout/bytes.hpp>
 #include <stout/flags.hpp>
+#include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
 #include <stout/option.hpp>
 #include <stout/path.hpp>
@@ -54,11 +59,19 @@ using std::string;
 using std::unique_ptr;
 using std::vector;
 
-using grpc::InsecureServerCredentials;
+using grpc::AsyncGenericService;
+using grpc::ByteBuffer;
+using grpc::ClientContext;
+using grpc::GenericClientAsyncResponseReader;
+using grpc::GenericServerAsyncReaderWriter;
+using grpc::GenericServerContext;
+using grpc::GenericStub;
 using grpc::Server;
 using grpc::ServerBuilder;
+using grpc::ServerCompletionQueue;
 using grpc::ServerContext;
 using grpc::Status;
+using grpc::WriteOptions;
 
 constexpr char PLUGIN_NAME[] = "org.apache.mesos.csi.test";
 constexpr char NODE_ID[] = "localhost";
@@ -95,6 +108,11 @@ public:
         "specified as a semicolon-delimited list of name:capacity pairs.\n"
         "If a volume with the same name already exists, the pair will be\n"
         "ignored. (Example: 'volume1:1GB;volume2:2GB')");
+
+    add(&Flags::forward,
+        "forward",
+        "If set, the plugin forwards all requests to the specified Unix\n"
+        "domain socket. (Example: 'unix:///path/to/socket')");
   }
 
   string endpoint;
@@ -102,6 +120,7 @@ public:
   Bytes available_capacity;
   Option<string> create_parameters;
   Option<string> volumes;
+  Option<string> forward;
 };
 
 
@@ -162,21 +181,9 @@ public:
 
       volumes.put(volumeInfo.id, volumeInfo);
     }
-
-    ServerBuilder builder;
-    builder.AddListeningPort(endpoint, InsecureServerCredentials());
-    builder.RegisterService(static_cast<csi::v0::Identity::Service*>(this));
-    builder.RegisterService(static_cast<csi::v0::Controller::Service*>(this));
-    builder.RegisterService(static_cast<csi::v0::Node::Service*>(this));
-    server = builder.BuildAndStart();
   }
 
-  void wait()
-  {
-    if (server) {
-      server->Wait();
-    }
-  }
+  void run();
 
   Status GetPluginInfo(
       ServerContext* context,
@@ -280,11 +287,22 @@ private:
   csi::v0::VolumeCapability defaultVolumeCapability;
   google::protobuf::Map<string, string> createParameters;
   hashmap<string, VolumeInfo> volumes;
-
-  unique_ptr<Server> server;
 };
 
 
+void TestCSIPlugin::run()
+{
+  ServerBuilder builder;
+  builder.AddListeningPort(endpoint, grpc::InsecureServerCredentials());
+  builder.RegisterService(static_cast<csi::v0::Identity::Service*>(this));
+  builder.RegisterService(static_cast<csi::v0::Controller::Service*>(this));
+  builder.RegisterService(static_cast<csi::v0::Node::Service*>(this));
+
+  std::unique_ptr<Server> server = builder.BuildAndStart();
+  server->Wait();
+}
+
+
 Status TestCSIPlugin::GetPluginInfo(
     ServerContext* context,
     const csi::v0::GetPluginInfoRequest* request,
@@ -922,6 +940,200 @@ Try<TestCSIPlugin::VolumeInfo> TestCSIPlugin::parseVolumePath(
 }
 
 
+// Serves CSI calls from the given endpoint through forwarding the calls to
+// another CSI endpoint and returning back the results.
+class CSIProxy
+{
+public:
+  CSIProxy(const string& _endpoint, const string& forward)
+    : endpoint(_endpoint),
+      stub(grpc::CreateChannel(forward, grpc::InsecureChannelCredentials())),
+      service(new AsyncGenericService()) {}
+
+  void run();
+
+private:
+  struct Call
+  {
+    Call() : state(State::INITIALIZED), serverReaderWriter(&serverContext) {}
+
+    enum class State {
+      INITIALIZED,
+      REQUESTED,
+      FORWARDING,
+      FINISHING,
+    } state;
+
+    GenericServerContext serverContext;
+    GenericServerAsyncReaderWriter serverReaderWriter;
+    ClientContext clientContext;
+    std::unique_ptr<GenericClientAsyncResponseReader> clientReader;
+
+    ByteBuffer request;
+    ByteBuffer response;
+    Status status;
+  };
+
+  void serve(ServerCompletionQueue* completionQueue);
+
+  const string endpoint;
+
+  GenericStub stub;
+  std::unique_ptr<AsyncGenericService> service;
+};
+
+
+void CSIProxy::run()
+{
+  ServerBuilder builder;
+  builder.AddListeningPort(endpoint, grpc::InsecureServerCredentials());
+  builder.RegisterAsyncGenericService(service.get());
+
+  std::unique_ptr<ServerCompletionQueue> completionQueue =
+    builder.AddCompletionQueue();
+
+  std::unique_ptr<Server> server = builder.BuildAndStart();
+  std::thread looper(std::bind(&CSIProxy::serve, this, completionQueue.get()));
+
+  server->Wait();
+
+  // Shutdown the completion queue after the server has been shut down. The
+  // looper thread will then drain the queue before finishing. See:
+  // https://grpc.io/grpc/cpp/classgrpc_1_1_server_builder.html#a960d55977e1aef56fd7b582037a01bbd // NOLINT
+  completionQueue->Shutdown();
+  looper.join();
+}
+
+
+// The lifecycle of a forwarded CSI call is shown as follows. The transitions
+// happen after the completions of the API calls.
+//
+//                                                     Server-side
+//        +-------------+             +-------------+ WriteAndFinish +---+
+//        | INITIALIZED |             |  FINISHING  +----------------> X |
+//        +------+------+             +------^------+                +---+
+//   Server-side |                           | Client-side
+//   RequestCall |        Server-side        | Finish (unary call)
+//        +------v------+    Read     +------+------+
+//        |  REQUESTED  +-------------> FORWARDING  |
+//        +-------------+             +-------------+
+//
+void CSIProxy::serve(ServerCompletionQueue* completionQueue)
+{
+  // Serve the first call. The ownership of the `Call` object is passed to the
+  // completion queue, and will be retrieved later in the loop below.
+  Call* first = new Call();
+  service->RequestCall(
+      &first->serverContext,
+      &first->serverReaderWriter,
+      completionQueue,
+      completionQueue,
+      first);
+
+  void* tag;
+  bool ok = false;
+
+  // See the link below for details of `ok`:
+  // https://grpc.io/grpc/cpp/classgrpc_1_1_completion_queue.html#a86d9810ced694e50f7987ac90b9f8c1a // NOLINT
+  while (completionQueue->Next(&tag, &ok)) {
+    // Retrieve the ownership of the next ready `Call` object from the
+    // completion queue.
+    Call* call = reinterpret_cast<Call*>(tag);
+
+    switch (call->state) {
+      case Call::State::INITIALIZED: {
+        if (!ok) {
+          // Server-side `RequestCall`: the server has been shutdown so continue
+          // to drain the queue.
+          continue;
+        }
+
+        call->state = Call::State::REQUESTED;
+
+        // Make a server-side `Read` call and return the ownership back to the
+        // completion queue.
+        call->serverReaderWriter.Read(&call->request, call);
+
+        // Serve the next call while processing this one. The ownership of the
+        // new `Call` object is passed to the completion queue, and will be
+        // retrieved in a later iteration.
+        Call* next = new Call();
+        service->RequestCall(
+            &next->serverContext,
+            &next->serverReaderWriter,
+            completionQueue,
+            completionQueue,
+            next);
+
+        break;
+      }
+      case Call::State::REQUESTED: {
+        if (!ok) {
+          // Server-side `Read`: the client has done a `WritesDone` already, so
+          // clean up the call and move on to the next one.
+          delete call;
+          continue;
+        }
+
+        LOG(INFO) << "Forwarding " << call->serverContext.method() << " call";
+
+        call->state = Call::State::FORWARDING;
+
+        call->clientContext.set_wait_for_ready(true);
+        call->clientContext.set_deadline(call->serverContext.deadline());
+
+        foreachpair (const grpc::string_ref& key,
+                     const grpc::string_ref& value,
+                     call->serverContext.client_metadata()) {
+          call->clientContext.AddMetadata(
+              grpc::string(key.data(), key.size()),
+              grpc::string(value.data(), value.size()));
+        }
+
+        call->clientReader = stub.PrepareUnaryCall(
+            &call->clientContext,
+            call->serverContext.method(),
+            call->request,
+            completionQueue);
+
+        call->clientReader->StartCall();
+
+        // Make a client-side `Finish` call for the unary RPC and return the
+        // ownership back to the completion queue.
+        call->clientReader->Finish(&call->response, &call->status, call);
+
+        break;
+      }
+      case Call::State::FORWARDING: {
+        // Client-side `Finish` for unary RPCs: `ok` should always be true.
+        CHECK(ok);
+
+        call->state = Call::State::FINISHING;
+
+        // Make a server-side `WriteAndFinish` call and return the ownership
+        // back to the completion queue.
+        call->serverReaderWriter.WriteAndFinish(
+            call->response, WriteOptions(), call->status, call);
+
+        break;
+      }
+      case Call::State::FINISHING: {
+        if (!ok) {
+          // Server-side `WriteAndFinish`: the response hasn't gone to the wire.
+          LOG(ERROR) << "Failed to forward response for "
+                     << call->serverContext.method() << " call";
+        }
+
+        // The call is completed so clean it up.
+        delete call;
+
+        break;
+      }
+    }
+  }
+}
+
+
 int main(int argc, char** argv)
 {
   Flags flags;
@@ -1006,21 +1218,28 @@ int main(int argc, char** argv)
 
   // Terminate the plugin if the endpoint socket file already exists to simulate
   // an `EADDRINUSE` bind error.
-  const string endpointPath = strings::remove("unix://", flags.endpoint);
+  const string endpointPath =
+    strings::remove(flags.endpoint, "unix://", strings::PREFIX);
+
   if (os::exists(endpointPath)) {
     cerr << "Failed to create endpoint '" << endpointPath << "': already exists"
          << endl;
+
     return EXIT_FAILURE;
   }
 
-  unique_ptr<TestCSIPlugin> plugin(new TestCSIPlugin(
-      flags.work_dir,
-      flags.endpoint,
-      flags.available_capacity,
-      createParameters,
-      volumes));
-
-  plugin->wait();
+  if (flags.forward.isSome()) {
+    CSIProxy proxy(flags.endpoint, flags.forward.get());
 
-  return EXIT_SUCCESS;
+    proxy.run();
+  } else {
+    TestCSIPlugin plugin(
+        flags.work_dir,
+        flags.endpoint,
+        flags.available_capacity,
+        createParameters,
+        volumes);
+
+    plugin.run();
+  }
 }