You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/21 01:26:21 UTC

[1/3] mesos git commit: Updated the CSI client to use the new gRPC client interface.

Repository: mesos
Updated Branches:
  refs/heads/master 62d117334 -> c49dbf0e2


Updated the CSI client to use the new gRPC client interface.

Review: https://reviews.apache.org/r/63968


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c49dbf0e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c49dbf0e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c49dbf0e

Branch: refs/heads/master
Commit: c49dbf0e2e460d4c9145381234333207c5f46b0c
Parents: 7018f96
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Nov 20 14:41:14 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 20 17:16:23 2017 -0800

----------------------------------------------------------------------
 src/csi/client.cpp | 227 ++++++++++++++++++++++++++++++++++--------------
 1 file changed, 163 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c49dbf0e/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index b6c28d8..5df788f 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -16,168 +16,267 @@
 
 #include "csi/client.hpp"
 
+using process::Failure;
 using process::Future;
 
+using process::grpc::RpcResult;
+
 namespace mesos {
 namespace csi {
 
 Future<GetSupportedVersionsResponse> Client::GetSupportedVersions(
     const GetSupportedVersionsRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Identity, GetSupportedVersions),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Identity, GetSupportedVersions), request)
+    .then([](const RpcResult<GetSupportedVersionsResponse>& result)
+        -> Future<GetSupportedVersionsResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<GetPluginInfoResponse> Client::GetPluginInfo(
     const GetPluginInfoRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Identity, GetPluginInfo),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Identity, GetPluginInfo), request)
+    .then([](const RpcResult<GetPluginInfoResponse>& result)
+        -> Future<GetPluginInfoResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<CreateVolumeResponse> Client::CreateVolume(
     const CreateVolumeRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, CreateVolume),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, CreateVolume), request)
+    .then([](const RpcResult<CreateVolumeResponse>& result)
+        -> Future<CreateVolumeResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<DeleteVolumeResponse> Client::DeleteVolume(
     const DeleteVolumeRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, DeleteVolume),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, DeleteVolume), request)
+    .then([](const RpcResult<DeleteVolumeResponse>& result)
+        -> Future<DeleteVolumeResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<ControllerPublishVolumeResponse> Client::ControllerPublishVolume(
     const ControllerPublishVolumeRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, ControllerPublishVolume),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, ControllerPublishVolume), request)
+    .then([](const RpcResult<ControllerPublishVolumeResponse>& result)
+        -> Future<ControllerPublishVolumeResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<ControllerUnpublishVolumeResponse> Client::ControllerUnpublishVolume(
     const ControllerUnpublishVolumeRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, ControllerUnpublishVolume),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, ControllerUnpublishVolume), request)
+    .then([](const RpcResult<ControllerUnpublishVolumeResponse>& result)
+        -> Future<ControllerUnpublishVolumeResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<ValidateVolumeCapabilitiesResponse> Client::ValidateVolumeCapabilities(
     const ValidateVolumeCapabilitiesRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, ValidateVolumeCapabilities),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, ValidateVolumeCapabilities), request)
+    .then([](const RpcResult<ValidateVolumeCapabilitiesResponse>& result)
+        -> Future<ValidateVolumeCapabilitiesResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<ListVolumesResponse> Client::ListVolumes(
     const ListVolumesRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, ListVolumes),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, ListVolumes), request)
+    .then([](const RpcResult<ListVolumesResponse>& result)
+        -> Future<ListVolumesResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<GetCapacityResponse> Client::GetCapacity(
     const GetCapacityRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, GetCapacity),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, GetCapacity), request)
+    .then([](const RpcResult<GetCapacityResponse>& result)
+        -> Future<GetCapacityResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<ControllerProbeResponse> Client::ControllerProbe(
     const ControllerProbeRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, ControllerProbe),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, ControllerProbe), request)
+    .then([](const RpcResult<ControllerProbeResponse>& result)
+        -> Future<ControllerProbeResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
     const ControllerGetCapabilitiesRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Controller, ControllerGetCapabilities),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Controller, ControllerGetCapabilities), request)
+    .then([](const RpcResult<ControllerGetCapabilitiesResponse>& result)
+        -> Future<ControllerGetCapabilitiesResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<NodePublishVolumeResponse> Client::NodePublishVolume(
     const NodePublishVolumeRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Node, NodePublishVolume),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Node, NodePublishVolume), request)
+    .then([](const RpcResult<NodePublishVolumeResponse>& result)
+        -> Future<NodePublishVolumeResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<NodeUnpublishVolumeResponse> Client::NodeUnpublishVolume(
     const NodeUnpublishVolumeRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Node, NodeUnpublishVolume),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Node, NodeUnpublishVolume), request)
+    .then([](const RpcResult<NodeUnpublishVolumeResponse>& result)
+        -> Future<NodeUnpublishVolumeResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<GetNodeIDResponse> Client::GetNodeID(
     const GetNodeIDRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Node, GetNodeID),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Node, GetNodeID), request)
+    .then([](const RpcResult<GetNodeIDResponse>& result)
+        -> Future<GetNodeIDResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<NodeProbeResponse> Client::NodeProbe(
     const NodeProbeRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Node, NodeProbe),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Node, NodeProbe), request)
+    .then([](const RpcResult<NodeProbeResponse>& result)
+        -> Future<NodeProbeResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 
 Future<NodeGetCapabilitiesResponse> Client::NodeGetCapabilities(
     const NodeGetCapabilitiesRequest& request)
 {
-  return runtime.call(
-      channel,
-      GRPC_RPC(Node, NodeGetCapabilities),
-      request);
+  return runtime
+    .call(channel, GRPC_RPC(Node, NodeGetCapabilities), request)
+    .then([](const RpcResult<NodeGetCapabilitiesResponse>& result)
+        -> Future<NodeGetCapabilitiesResponse> {
+      if (result.status.ok()) {
+        return result.response;
+      } else {
+        return Failure(result.status.error_message());
+      }
+    });
 }
 
 } // namespace csi {


[3/3] mesos git commit: Bumped the CSI version to ec29890.

Posted by ji...@apache.org.
Bumped the CSI version to ec29890.

Review: https://reviews.apache.org/r/63966


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3f81f7e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3f81f7e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3f81f7e

Branch: refs/heads/master
Commit: b3f81f7e042befae9e9d3124d1fb83d518cad4df
Parents: 62d1173
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Nov 20 11:39:03 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 20 17:16:23 2017 -0800

----------------------------------------------------------------------
 3rdparty/csi-0.1.0.tar.gz      | Bin 80603 -> 66799 bytes
 src/csi/client.cpp             |  16 +++++++++++++---
 src/csi/client.hpp             |   7 +++++--
 src/tests/csi_client_tests.cpp |   3 ++-
 src/tests/mock_csi_plugin.hpp  |   3 ++-
 5 files changed, 22 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b3f81f7e/3rdparty/csi-0.1.0.tar.gz
----------------------------------------------------------------------
diff --git a/3rdparty/csi-0.1.0.tar.gz b/3rdparty/csi-0.1.0.tar.gz
index 1d09c70..d2ff83e 100644
Binary files a/3rdparty/csi-0.1.0.tar.gz and b/3rdparty/csi-0.1.0.tar.gz differ

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3f81f7e/src/csi/client.cpp
----------------------------------------------------------------------
diff --git a/src/csi/client.cpp b/src/csi/client.cpp
index e171f03..b6c28d8 100644
--- a/src/csi/client.cpp
+++ b/src/csi/client.cpp
@@ -111,6 +111,16 @@ Future<GetCapacityResponse> Client::GetCapacity(
 }
 
 
+Future<ControllerProbeResponse> Client::ControllerProbe(
+    const ControllerProbeRequest& request)
+{
+  return runtime.call(
+      channel,
+      GRPC_RPC(Controller, ControllerProbe),
+      request);
+}
+
+
 Future<ControllerGetCapabilitiesResponse> Client::ControllerGetCapabilities(
     const ControllerGetCapabilitiesRequest& request)
 {
@@ -151,12 +161,12 @@ Future<GetNodeIDResponse> Client::GetNodeID(
 }
 
 
-Future<ProbeNodeResponse> Client::ProbeNode(
-    const ProbeNodeRequest& request)
+Future<NodeProbeResponse> Client::NodeProbe(
+    const NodeProbeRequest& request)
 {
   return runtime.call(
       channel,
-      GRPC_RPC(Node, ProbeNode),
+      GRPC_RPC(Node, NodeProbe),
       request);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3f81f7e/src/csi/client.hpp
----------------------------------------------------------------------
diff --git a/src/csi/client.hpp b/src/csi/client.hpp
index df674e1..a5571f4 100644
--- a/src/csi/client.hpp
+++ b/src/csi/client.hpp
@@ -66,6 +66,9 @@ public:
   process::Future<GetCapacityResponse>
     GetCapacity(const GetCapacityRequest& request);
 
+  process::Future<ControllerProbeResponse>
+    ControllerProbe(const ControllerProbeRequest& request);
+
   process::Future<ControllerGetCapabilitiesResponse>
     ControllerGetCapabilities(const ControllerGetCapabilitiesRequest& request);
 
@@ -79,8 +82,8 @@ public:
   process::Future<GetNodeIDResponse>
     GetNodeID(const GetNodeIDRequest& request);
 
-  process::Future<ProbeNodeResponse>
-    ProbeNode(const ProbeNodeRequest& request);
+  process::Future<NodeProbeResponse>
+    NodeProbe(const NodeProbeRequest& request);
 
   process::Future<NodeGetCapabilitiesResponse>
     NodeGetCapabilities(const NodeGetCapabilitiesRequest& request);

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3f81f7e/src/tests/csi_client_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/csi_client_tests.cpp b/src/tests/csi_client_tests.cpp
index 725466a..2c5dbb7 100644
--- a/src/tests/csi_client_tests.cpp
+++ b/src/tests/csi_client_tests.cpp
@@ -121,6 +121,7 @@ INSTANTIATE_TEST_CASE_P(
         RPC_PARAM(Client::ValidateVolumeCapabilities),
         RPC_PARAM(Client::ListVolumes),
         RPC_PARAM(Client::GetCapacity),
+        RPC_PARAM(Client::ControllerProbe),
         RPC_PARAM(Client::ControllerGetCapabilities)),
     RPCParam::Printer());
 
@@ -131,7 +132,7 @@ INSTANTIATE_TEST_CASE_P(
         RPC_PARAM(Client::NodePublishVolume),
         RPC_PARAM(Client::NodeUnpublishVolume),
         RPC_PARAM(Client::GetNodeID),
-        RPC_PARAM(Client::ProbeNode),
+        RPC_PARAM(Client::NodeProbe),
         RPC_PARAM(Client::NodeGetCapabilities)),
     RPCParam::Printer());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3f81f7e/src/tests/mock_csi_plugin.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_csi_plugin.hpp b/src/tests/mock_csi_plugin.hpp
index b7926c4..162dc64 100644
--- a/src/tests/mock_csi_plugin.hpp
+++ b/src/tests/mock_csi_plugin.hpp
@@ -43,11 +43,12 @@ namespace tests {
   macro(ValidateVolumeCapabilities)      \
   macro(ListVolumes)                     \
   macro(GetCapacity)                     \
+  macro(ControllerProbe)                 \
   macro(ControllerGetCapabilities)       \
   macro(NodePublishVolume)               \
   macro(NodeUnpublishVolume)             \
   macro(GetNodeID)                       \
-  macro(ProbeNode)                       \
+  macro(NodeProbe)                       \
   macro(NodeGetCapabilities)
 
 #define DECLARE_MOCK_CSI_METHOD(name)    \


[2/3] mesos git commit: Updated the gRPC client to return both status and payload.

Posted by ji...@apache.org.
Updated the gRPC client to return both status and payload.

Some user might want to perform differently based on the error code in
the gRPC Status. This patch exposes the gRPC Status by introducing a
struct called `RpcResult` which contains both the gRPC Status and the
response body.

Note that gRPC allows gRPC Status to be non-OK and still has a response
body.

Review: https://reviews.apache.org/r/63967


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7018f963
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7018f963
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7018f963

Branch: refs/heads/master
Commit: 7018f963257b55bf07a6afd317b727f50deeee1d
Parents: b3f81f7
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Nov 20 14:38:17 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 20 17:16:23 2017 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/grpc.hpp | 28 +++++++---
 3rdparty/libprocess/src/tests/grpc_tests.cpp | 63 ++++++++++++++++-------
 2 files changed, 67 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7018f963/3rdparty/libprocess/include/process/grpc.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/grpc.hpp b/3rdparty/libprocess/include/process/grpc.hpp
index f5dc8b5..0f63e57 100644
--- a/3rdparty/libprocess/include/process/grpc.hpp
+++ b/3rdparty/libprocess/include/process/grpc.hpp
@@ -71,6 +71,21 @@ private:
 };
 
 
+/**
+ * The response of a RPC call. It includes the gRPC `Status`
+ * (https://grpc.io/grpc/cpp/classgrpc_1_1_status.html), and
+ * the actual protobuf response body.
+ */
+template <typename T>
+struct RpcResult
+{
+  RpcResult(const ::grpc::Status& _status, const T& _response)
+    : status(_status), response(_response) {}
+
+  ::grpc::Status status;
+  T response;
+};
+
 namespace client {
 
 /**
@@ -101,7 +116,7 @@ public:
    * @return a `Future` waiting for a response protobuf.
    */
   template <typename Stub, typename Request, typename Response>
-  Future<Response> call(
+  Future<RpcResult<Response>> call(
       const Channel& channel,
       std::unique_ptr<::grpc::ClientAsyncResponseReader<Response>>(Stub::*rpc)(
           ::grpc::ClientContext*,
@@ -129,7 +144,9 @@ public:
       // an asynchronous gRPC call through the `CompletionQueue`
       // managed by `data`. The `Promise` will be set by the callback
       // upon server response.
-      std::shared_ptr<Promise<Response>> promise(new Promise<Response>);
+      std::shared_ptr<Promise<RpcResult<Response>>> promise(
+          new Promise<RpcResult<Response>>);
+
       promise->future().onDiscard([=] { context->TryCancel(); });
 
       std::shared_ptr<Response> response(new Response());
@@ -150,11 +167,10 @@ public:
                 CHECK(promise->future().isPending());
                 if (promise->future().hasDiscard()) {
                   promise->discard();
-                } else if (status->ok()) {
-                  promise->set(*response);
-                } else {
-                  promise->fail(status->error_message());
+                  return;
                 }
+
+                promise->set(RpcResult<Response>(*status, *response));
               }));
 
       return promise->future();

http://git-wip-us.apache.org/repos/asf/mesos/blob/7018f963/3rdparty/libprocess/src/tests/grpc_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/grpc_tests.cpp b/3rdparty/libprocess/src/tests/grpc_tests.cpp
index 7ac4ef6..8bb517f 100644
--- a/3rdparty/libprocess/src/tests/grpc_tests.cpp
+++ b/3rdparty/libprocess/src/tests/grpc_tests.cpp
@@ -46,6 +46,7 @@ using process::Future;
 using process::Promise;
 
 using process::grpc::Channel;
+using process::grpc::RpcResult;
 
 using testing::_;
 using testing::DoAll;
@@ -115,9 +116,11 @@ TEST_F(GRPCClientTest, Success)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
-  AWAIT_EXPECT_READY(pong);
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_TRUE(pong->status.ok());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -160,9 +163,14 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong1 = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
-  Future<Pong> pong2 = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
-  Future<Pong> pong3 = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong1 =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+
+  Future<RpcResult<Pong>> pong2 =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+
+  Future<RpcResult<Pong>> pong3 =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed1->future());
   AWAIT_READY(processed2->future());
@@ -170,9 +178,14 @@ TEST_F(GRPCClientTest, ConcurrentRPCs)
 
   pinged->set(Nothing());
 
-  AWAIT_EXPECT_READY(pong1);
-  AWAIT_EXPECT_READY(pong2);
-  AWAIT_EXPECT_READY(pong3);
+  AWAIT_ASSERT_READY(pong1);
+  EXPECT_TRUE(pong1->status.ok());
+
+  AWAIT_ASSERT_READY(pong2);
+  EXPECT_TRUE(pong2->status.ok());
+
+  AWAIT_ASSERT_READY(pong3);
+  EXPECT_TRUE(pong3->status.ok());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -195,9 +208,11 @@ TEST_F(GRPCClientTest, Failed)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
-  AWAIT_EXPECT_FAILED(pong);
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_FALSE(pong->status.ok());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
@@ -218,7 +233,9 @@ TEST_F(GRPCClientTest, DiscardedBeforeServerStarted)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+
   pong.discard();
 
   ASSERT_SOME(server.Startup(server_address()));
@@ -254,7 +271,9 @@ TEST_F(GRPCClientTest, DiscardedWhenServerProcessing)
 
   ASSERT_SOME(server.Startup(server_address()));
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+
   AWAIT_READY(processed->future());
 
   pong.discard();
@@ -291,7 +310,8 @@ TEST_F(GRPCClientTest, ClientShutdown)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   AWAIT_READY(processed->future());
 
@@ -299,7 +319,9 @@ TEST_F(GRPCClientTest, ClientShutdown)
   AWAIT_ASSERT_READY(runtime.wait());
 
   shutdown->set(Nothing());
-  AWAIT_EXPECT_FAILED(pong);
+
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_FALSE(pong->status.ok());
 
   ASSERT_SOME(server.Shutdown());
 }
@@ -312,12 +334,14 @@ TEST_F(GRPCClientTest, ServerUnreachable)
   client::Runtime runtime;
   Channel channel("nosuchhost");
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   runtime.terminate();
   AWAIT_ASSERT_READY(runtime.wait());
 
-  AWAIT_EXPECT_FAILED(pong);
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_FALSE(pong->status.ok());
 }
 
 
@@ -341,12 +365,15 @@ TEST_F(GRPCClientTest, ServerTimeout)
   client::Runtime runtime;
   Channel channel(server_address());
 
-  Future<Pong> pong = runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
+  Future<RpcResult<Pong>> pong =
+    runtime.call(channel, GRPC_RPC(PingPong, Send), Ping());
 
   // TODO(chhsiao): The gRPC library returns a failure after the default
   // timeout (5 seconds) is passed, no matter when the `CompletionQueue`
   // is shut down. The timeout should be lowered once we support it.
-  AWAIT_EXPECT_FAILED(pong);
+  AWAIT_ASSERT_READY(pong);
+  EXPECT_FALSE(pong->status.ok());
+
   done->set(Nothing());
 
   runtime.terminate();