You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/03/15 17:19:01 UTC

[1/2] incubator-kudu git commit: rpc: add negotiation of RPC-layer feature flags

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 4ec85eec3 -> f82919ed5


rpc: add negotiation of RPC-layer feature flags

This extends the RPC system with a simple mechanism for advertising
a set of supported feature flags during connection establishment.
The list of supported features is hard-coded in the build, and is
used in lieu of a version number in order to communicate which
capabilities an RPC client or server supports.

There are several advantages of feature flags over version numbers:
- since we have both a Java and C++ client, this allows us to add
  features in different orders, or decide to not support a feature
  at all in one client or the other. For example, the C++ client
  is likely to gain support for a shared-memory transport long before
  the Java one.
- this allows much more flexibility in backporting RPC system features
  across versions. For example, if we introduce feature 'A' in Kudu
  2.0, and feature 'B' in Kudu 2.1, we are still able to selectively
  backport 'B' without 'A' to Kudu 1.5.
- currently, the set of supported features is determined only by
  code-level support, but we could later decide to conditionally
  enable features based on configuration or machine capability.

The most reasonable place to add the exchange of feature flags was
to add them to the existing SASL negotiation messages, where we
already had an unused 'version' field. The naming is not great, given
that the SASL negotiation is now negotiating more than just SASL,
but I didn't want to feature-creep into a wholesale rename or refactor,
and adding a separate negotiation phase would waste an extra round trip
as well as be difficult to add compatibly.

Change-Id: Ib9c7fdf03920496c12d92eba23d8d4f7b7cb8fc5
Reviewed-on: http://gerrit.cloudera.org:8080/2238
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: David Ribeiro Alves <da...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/86a510a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/86a510a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/86a510a3

Branch: refs/heads/master
Commit: 86a510a39f290cee59bfecf242d4a804d88cee46
Parents: 4ec85ee
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Feb 18 17:47:33 2016 -0800
Committer: Dan Burkert <da...@cloudera.com>
Committed: Tue Mar 15 06:03:19 2016 +0000

----------------------------------------------------------------------
 docs/design-docs/rpc.md       | 22 ++++++++++++++++++++--
 src/kudu/rpc/constants.cc     |  4 ++++
 src/kudu/rpc/constants.h      | 13 ++++++++++++-
 src/kudu/rpc/rpc_header.proto | 28 +++++++++++++++++++++++++++-
 src/kudu/rpc/sasl_client.cc   | 18 ++++++++++++++++++
 src/kudu/rpc/sasl_client.h    | 22 ++++++++++++++++------
 src/kudu/rpc/sasl_rpc-test.cc |  3 +++
 src/kudu/rpc/sasl_server.cc   | 16 +++++++++++++++-
 src/kudu/rpc/sasl_server.h    | 24 +++++++++++++++++-------
 9 files changed, 132 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/docs/design-docs/rpc.md
----------------------------------------------------------------------
diff --git a/docs/design-docs/rpc.md b/docs/design-docs/rpc.md
index 628aa33..e5435b9 100644
--- a/docs/design-docs/rpc.md
+++ b/docs/design-docs/rpc.md
@@ -341,7 +341,25 @@ token=<challenge response> } -----------------> |
 Each of the SaslMessagePBs above is framed as usual using RequestHeader or ResponseHeader
 protobufs. For each SASL message, the CallId should be set to '-33'.
 
-
+RPC Feature Flags
+-----------------
+
+During connection negotiation the client and server exchange the set of RPC
+feature flags, so that subsequent RPCs request and responses are aware of what
+is supported. There are several advantages of feature flags over version numbers:
+
+* since we have both a Java and C++ client, this allows us to add
+  features in different orders, or decide to not support a feature
+  at all in one client or the other. For example, the C++ client
+  is likely to gain support for a shared-memory transport long before
+  the Java one.
+* this allows much more flexibility in backporting RPC system features
+  across versions. For example, if we introduce feature 'A' in Kudu
+  2.0, and feature 'B' in Kudu 2.1, we are still able to selectively
+  backport 'B' without 'A' to Kudu 1.5.
+* currently, the set of supported features is determined only by
+  code-level support, but we could later decide to conditionally
+  enable features based on configuration or machine capability.
 
 Connection Context:
 ------------------
@@ -359,4 +377,4 @@ protobufs.
 The client must send calls in strictly increasing 'call_id' order. The server
 may reject repeated calls or calls with lower IDs. The server's responses may
 arrive out-of-order, and use the 'call_id' in the response to associate a response
-with the correct call.
\ No newline at end of file
+with the correct call.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/src/kudu/rpc/constants.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/constants.cc b/src/kudu/rpc/constants.cc
index fcff30b..68955e8 100644
--- a/src/kudu/rpc/constants.cc
+++ b/src/kudu/rpc/constants.cc
@@ -17,12 +17,16 @@
 
 #include "kudu/rpc/constants.h"
 
+using std::set;
+
 namespace kudu {
 namespace rpc {
 
 const char* const kMagicNumber = "hrpc";
 const char* const kSaslAppName = "Kudu";
 const char* const kSaslProtoName = "kudu";
+set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags = { TMP_TEST_FEATURE_FLAG };
+set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { TMP_TEST_FEATURE_FLAG };
 
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/src/kudu/rpc/constants.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/constants.h b/src/kudu/rpc/constants.h
index 8ffa81f..d050a34 100644
--- a/src/kudu/rpc/constants.h
+++ b/src/kudu/rpc/constants.h
@@ -18,7 +18,10 @@
 #ifndef KUDU_RPC_RPC_CONSTANTS_H
 #define KUDU_RPC_RPC_CONSTANTS_H
 
-#include <stdint.h>
+#include <cstdint>
+#include <set>
+
+#include "kudu/rpc/rpc_header.pb.h"
 
 namespace kudu {
 namespace rpc {
@@ -46,6 +49,14 @@ static const uint8_t kHeaderFlagsLength = 3;
 // There is a 4-byte length prefix before any packet.
 static const uint8_t kMsgLengthPrefixLength = 4;
 
+// The set of RPC features that this server build supports.
+// Non-const for testing.
+extern std::set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags;
+
+// The set of RPC features that this client build supports.
+// Non-const for testing.
+extern std::set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags;
+
 } // namespace rpc
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index a6e2199..100a57d 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -48,6 +48,24 @@ message ConnectionContextPB {
   optional UserInformationPB user_info = 2;
 }
 
+// Features supported by the RPC system itself.
+//
+// Note that this should be used to evolve the RPC _system_, not the semantics
+// or compatibility of individual calls.
+//
+// For example, if we were to add a feature like call or response wire
+// compression in the future, we could add a flag here to indicate that the
+// client or server supports that feature. Optional features which may safely be
+// ignored by the receiver do not need a feature flag, instead the optional
+// field feature of ProtoBuf may be utilized.
+enum RpcFeatureFlag {
+  UNKNOWN = 0;
+
+  // TODO: as soon as we have any real feature flag, we can get rid of this
+  // feature flag. For now, we just need something to assert on for tests.
+  TMP_TEST_FEATURE_FLAG = 1000;
+};
+
 // Message type passed back & forth for the SASL negotiation.
 message SaslMessagePB {
   enum SaslState {
@@ -69,7 +87,15 @@ message SaslMessagePB {
     optional bytes challenge = 5;
   }
 
-  optional uint32 version  = 1;
+  // When the client sends its first NEGOTIATE message, it sends its set of supported
+  // RPC system features. In the response to this message, the server sends back its own.
+  // This allows the two peers to agree on whether newer extensions of the
+  // RPC system may be used on this connection. We use a list of features rather than
+  // a simple version number to make it easier for the Java and C++ clients to implement
+  // features in different orders while still maintaining compatibility, as well as
+  // to simplify backporting of features out-of-order.
+  repeated RpcFeatureFlag supported_features = 1;
+
   required SaslState state = 2;  // RPC system SASL state.
   optional bytes token     = 3;
   repeated SaslAuth auths  = 4;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/src/kudu/rpc/sasl_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_client.cc b/src/kudu/rpc/sasl_client.cc
index 675e1b0..3ac47f8 100644
--- a/src/kudu/rpc/sasl_client.cc
+++ b/src/kudu/rpc/sasl_client.cc
@@ -256,6 +256,12 @@ Status SaslClient::ParseSaslMsgResponse(const ResponseHeader& header, const Slic
 Status SaslClient::SendNegotiateMessage() {
   SaslMessagePB msg;
   msg.set_state(SaslMessagePB::NEGOTIATE);
+
+  // Advertise our supported features.
+  for (RpcFeatureFlag feature : kSupportedClientRpcFeatureFlags) {
+    msg.add_supported_features(feature);
+  }
+
   TRACE("SASL Client: Sending NEGOTIATE request to server.");
   RETURN_NOT_OK(SendSaslMessage(msg));
   nego_response_expected_ = true;
@@ -302,6 +308,18 @@ Status SaslClient::HandleNegotiateResponse(const SaslMessagePB& response) {
   TRACE("SASL Client: Received NEGOTIATE response from server");
   map<string, SaslMessagePB::SaslAuth> mech_auth_map;
 
+  // Fill in the set of features supported by the server.
+  for (int flag : response.supported_features()) {
+    // We only add the features that our local build knows about.
+    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+    if (ContainsKey(kSupportedClientRpcFeatureFlags, feature_flag)) {
+      server_features_.insert(feature_flag);
+    }
+  }
+
+  // Build the list of SASL mechanisms requested by the client, and a map
+  // back to to the SaslAuth PBs.
   string mech_list;
   mech_list.reserve(64);  // Avoid resizing the buffer later.
   for (const SaslMessagePB::SaslAuth& auth : response.auths()) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/src/kudu/rpc/sasl_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_client.h b/src/kudu/rpc/sasl_client.h
index b9f460b..a01865e 100644
--- a/src/kudu/rpc/sasl_client.h
+++ b/src/kudu/rpc/sasl_client.h
@@ -25,6 +25,7 @@
 #include <sasl/sasl.h>
 
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/sasl_helper.h"
 #include "kudu/util/monotime.h"
@@ -49,27 +50,33 @@ class SaslClient {
   ~SaslClient();
 
   // Enable ANONYMOUS authentication.
-  // Call after Init().
+  // Must be called after Init().
   Status EnableAnonymous();
 
   // Enable PLAIN authentication.
-  // Call after Init().
+  // Must be called after Init().
   Status EnablePlain(const string& user, const string& pass);
 
   // Returns mechanism negotiated by this connection.
-  // Call after Negotiate().
+  // Must be called after Negotiate().
   SaslMechanism::Type negotiated_mechanism() const;
 
+  // Returns the set of RPC system features supported by the remote server.
+  // Must be called after Negotiate().
+  const std::set<RpcFeatureFlag>& server_features() const {
+    return server_features_;
+  }
+
   // Specify IP:port of local side of connection.
-  // Call before Init(). Required for some mechanisms.
+  // Must be called before Init(). Required for some mechanisms.
   void set_local_addr(const Sockaddr& addr);
 
   // Specify IP:port of remote side of connection.
-  // Call before Init(). Required for some mechanisms.
+  // Must be called before Init(). Required for some mechanisms.
   void set_remote_addr(const Sockaddr& addr);
 
   // Specify the fully-qualified domain name of the remote server.
-  // Call before Init(). Required for some mechanisms.
+  // Must be called before Init(). Required for some mechanisms.
   void set_server_fqdn(const string& domain_name);
 
   // Set deadline for connection negotiation.
@@ -146,6 +153,9 @@ class SaslClient {
   string plain_pass_;
   gscoped_ptr<sasl_secret_t, FreeDeleter> psecret_;
 
+  // The set of features supported by the server.
+  std::set<RpcFeatureFlag> server_features_;
+
   SaslNegotiationState::Type client_state_;
 
   // The mechanism we negotiated with the server.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/src/kudu/rpc/sasl_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_rpc-test.cc b/src/kudu/rpc/sasl_rpc-test.cc
index 6697a0b..25ea80d 100644
--- a/src/kudu/rpc/sasl_rpc-test.cc
+++ b/src/kudu/rpc/sasl_rpc-test.cc
@@ -24,6 +24,7 @@
 #include <sasl/sasl.h>
 
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/auth_store.h"
 #include "kudu/rpc/sasl_client.h"
@@ -117,6 +118,7 @@ static void RunPlainNegotiationServer(Socket* conn) {
   CHECK_OK(sasl_server.Init(kSaslAppName));
   CHECK_OK(sasl_server.EnablePlain(std::move(authstore)));
   CHECK_OK(sasl_server.Negotiate());
+  CHECK(ContainsKey(sasl_server.client_features(), TMP_TEST_FEATURE_FLAG));
 }
 
 static void RunPlainNegotiationClient(Socket* conn) {
@@ -124,6 +126,7 @@ static void RunPlainNegotiationClient(Socket* conn) {
   CHECK_OK(sasl_client.Init(kSaslAppName));
   CHECK_OK(sasl_client.EnablePlain("danger", "burrito"));
   CHECK_OK(sasl_client.Negotiate());
+  CHECK(ContainsKey(sasl_client.server_features(), TMP_TEST_FEATURE_FLAG));
 }
 
 // Test SASL negotiation using the PLAIN mechanism over a socket.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/src/kudu/rpc/sasl_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_server.cc b/src/kudu/rpc/sasl_server.cc
index 308d66b..5cb5699 100644
--- a/src/kudu/rpc/sasl_server.cc
+++ b/src/kudu/rpc/sasl_server.cc
@@ -273,7 +273,16 @@ Status SaslServer::SendSaslError(ErrorStatusPB::RpcErrorCodePB code, const Statu
 Status SaslServer::HandleNegotiateRequest(const SaslMessagePB& request) {
   TRACE("SASL Server: Received NEGOTIATE request from client");
 
-  // Authentication mechanisms this server supports (i.e. plugins).
+  // Fill in the set of features supported by the client.
+  for (int flag : request.supported_features()) {
+    // We only add the features that our local build knows about.
+    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+    if (ContainsKey(kSupportedServerRpcFeatureFlags, feature_flag)) {
+      client_features_.insert(feature_flag);
+    }
+  }
+
   set<string> server_mechs = helper_.LocalMechs();
   if (PREDICT_FALSE(server_mechs.empty())) {
     // This will happen if no mechanisms are enabled before calling Init()
@@ -302,6 +311,11 @@ Status SaslServer::SendNegotiateResponse(const set<string>& server_mechs) {
     auth->set_mechanism(mech);
   }
 
+  // Tell the client which features we support.
+  for (RpcFeatureFlag feature : kSupportedServerRpcFeatureFlags) {
+    response.add_supported_features(feature);
+  }
+
   RETURN_NOT_OK(SendSaslMessage(response));
   TRACE("Sent NEGOTIATE response");
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/86a510a3/src/kudu/rpc/sasl_server.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_server.h b/src/kudu/rpc/sasl_server.h
index e71e958..2f9c194 100644
--- a/src/kudu/rpc/sasl_server.h
+++ b/src/kudu/rpc/sasl_server.h
@@ -50,31 +50,37 @@ class SaslServer {
   ~SaslServer();
 
   // Enable ANONYMOUS authentication.
-  // Call after Init().
+  // Must be called after Init().
   Status EnableAnonymous();
 
   // Enable PLAIN authentication. TODO: Support impersonation.
-  // Call after Init().
+  // Must be called after Init().
   Status EnablePlain(gscoped_ptr<AuthStore> authstore);
 
   // Returns mechanism negotiated by this connection.
-  // Call after Negotiate().
+  // Must be called after Negotiate().
   SaslMechanism::Type negotiated_mechanism() const;
 
+  // Returns the set of RPC system features supported by the remote client.
+  // Must be called after Negotiate().
+  const std::set<RpcFeatureFlag>& client_features() const {
+    return client_features_;
+  }
+
   // Name of the user that authenticated using plain auth.
-  // Call after Negotiate() and only if the negotiated mechanism was PLAIN.
+  // Must be called after Negotiate() only if the negotiated mechanism was PLAIN.
   const std::string& plain_auth_user() const;
 
   // Specify IP:port of local side of connection.
-  // Call before Init(). Required for some mechanisms.
+  // Must be called before Init(). Required for some mechanisms.
   void set_local_addr(const Sockaddr& addr);
 
   // Specify IP:port of remote side of connection.
-  // Call before Init(). Required for some mechanisms.
+  // Must be called before Init(). Required for some mechanisms.
   void set_remote_addr(const Sockaddr& addr);
 
   // Specify the fully-qualified domain name of the remote server.
-  // Call before Init(). Required for some mechanisms.
+  // Must be called before Init(). Required for some mechanisms.
   void set_server_fqdn(const string& domain_name);
 
   // Set deadline for connection negotiation.
@@ -144,6 +150,10 @@ class SaslServer {
   // Authentication store used for PLAIN authentication.
   gscoped_ptr<AuthStore> authstore_;
 
+  // The set of features that the client supports. Filled in
+  // after we receive the NEGOTIATE request from the client.
+  std::set<RpcFeatureFlag> client_features_;
+
   // The successfully-authenticated user, if applicable.
   string plain_auth_user_;
 


[2/2] incubator-kudu git commit: rpc: call-level feature flags

Posted by to...@apache.org.
rpc: call-level feature flags

This adds a new API RpcController::RequireServerFeature on the client side, and
ServiceIf::SupportsFeature on the service side. These APIs make it possible for
the client to require application-specific feature support on the server in
order for an individual RPC call to be serviced. If the feature is not
supported, the RPC is rejected and an appropriate error is returned.

This will be used in the future when adding features to existing RPC messages,
so that the server does not silently ignore additional fields.

These RPC features require a new RPC feature flag, APPLICATION_FEATURE_FLAGS, to
be supported. If the server does not support that RPC feature flag, then the
call will fail on the client side.

Change-Id: I46e46bcf80b93fc6cce50f37dd71a6e6539047f8
Reviewed-on: http://gerrit.cloudera.org:8080/2239
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/f82919ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/f82919ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/f82919ed

Branch: refs/heads/master
Commit: f82919ed595a415053b7f4180f70a59674a32730
Parents: 86a510a
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Feb 18 17:55:18 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Mar 15 16:15:50 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/connection.cc           | 36 ++++++++++++---
 src/kudu/rpc/constants.cc            |  4 +-
 src/kudu/rpc/inbound_call.cc         | 23 ++++++++-
 src/kudu/rpc/inbound_call.h          |  6 +++
 src/kudu/rpc/outbound_call.cc        | 24 ++++++++--
 src/kudu/rpc/outbound_call.h         |  3 ++
 src/kudu/rpc/rpc-test-base.h         | 42 ++++++++---------
 src/kudu/rpc/rpc-test.cc             | 77 +++++++++++++++++++++++++++++++
 src/kudu/rpc/rpc_controller.cc       |  5 ++
 src/kudu/rpc/rpc_controller.h        | 51 ++++++++++++++++++++
 src/kudu/rpc/rpc_header.proto        | 25 ++++++++--
 src/kudu/rpc/rtest.proto             |  5 ++
 src/kudu/rpc/sasl_rpc-test.cc        |  4 +-
 src/kudu/rpc/service_if.cc           |  4 ++
 src/kudu/rpc/service_if.h            |  4 ++
 src/kudu/rpc/service_pool.cc         | 12 +++++
 src/kudu/rpc/transfer.cc             |  7 ++-
 src/kudu/rpc/transfer.h              | 26 +++++++++--
 src/kudu/util/CMakeLists.txt         |  3 +-
 src/kudu/util/scoped_cleanup-test.cc | 34 ++++++++++++++
 src/kudu/util/scoped_cleanup.h       | 40 ++++++++++++++++
 21 files changed, 384 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index f1626ea..b811911 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -17,12 +17,13 @@
 
 #include "kudu/rpc/connection.h"
 
+#include <algorithm>
 #include <boost/intrusive/list.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
-#include <stdint.h>
-
 #include <iostream>
+#include <set>
+#include <stdint.h>
 #include <string>
 #include <vector>
 
@@ -44,6 +45,9 @@
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
 
+using std::function;
+using std::includes;
+using std::set;
 using std::shared_ptr;
 using std::vector;
 using strings::Substitute;
@@ -214,7 +218,6 @@ void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
   // already timed out.
 }
 
-
 // Callbacks after sending a call on the wire.
 // This notifies the OutboundCall object to change its state to SENT once it
 // has been fully transmitted.
@@ -291,7 +294,7 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
   TransferCallbacks *cb = new CallTransferCallbacks(call);
   awaiting_response_[call_id] = car.release();
   QueueOutbound(gscoped_ptr<OutboundTransfer>(
-                  new OutboundTransfer(slices_tmp_, cb)));
+        new OutboundTransfer(call_id, slices_tmp_, call->RequiredRpcFeatures(), cb)));
 }
 
 // Callbacks for sending an RPC call response from the server.
@@ -367,7 +370,9 @@ void Connection::QueueResponseForCall(gscoped_ptr<InboundCall> call) {
 
   TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
   // After the response is sent, can delete the InboundCall object.
-  gscoped_ptr<OutboundTransfer> t(new OutboundTransfer(slices, cb));
+  // We set a dummy call ID and required feature set, since these are not needed
+  // when sending responses.
+  gscoped_ptr<OutboundTransfer> t(new OutboundTransfer(-1, slices, {}, cb));
 
   QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
   reactor_thread_->reactor()->ScheduleReactorTask(task);
@@ -497,6 +502,26 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
   while (!outbound_transfers_.empty()) {
     transfer = &(outbound_transfers_.front());
 
+    if (!transfer->TransferStarted()) {
+      // If this is the start of the transfer, then check if the server has the
+      // required RPC flags. We have to wait until just before the transfer in
+      // order to ensure that the negotiation has taken place, so that the flags
+      // are available.
+      const set<RpcFeatureFlag>& required_features = transfer->required_features();
+      const set<RpcFeatureFlag>& server_features = sasl_client_.server_features();
+      if (!includes(server_features.begin(), server_features.end(),
+                    required_features.begin(), required_features.end())) {
+        outbound_transfers_.pop_front();
+        CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
+        Status s = Status::NotSupported("server does not support the required RPC features");
+        transfer->Abort(s);
+        car->call->SetFailed(s);
+        car->call.reset();
+        delete transfer;
+        continue;
+      }
+    }
+
     last_activity_time_ = reactor_thread_->cur_time();
     Status status = transfer->SendBuffer(socket_);
     if (PREDICT_FALSE(!status.ok())) {
@@ -514,7 +539,6 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
     delete transfer;
   }
 
-
   // If we were able to write all of our outbound transfers,
   // we don't have any more to write.
   write_io_.stop();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/constants.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/constants.cc b/src/kudu/rpc/constants.cc
index 68955e8..c86bc29 100644
--- a/src/kudu/rpc/constants.cc
+++ b/src/kudu/rpc/constants.cc
@@ -25,8 +25,8 @@ namespace rpc {
 const char* const kMagicNumber = "hrpc";
 const char* const kSaslAppName = "Kudu";
 const char* const kSaslProtoName = "kudu";
-set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags = { TMP_TEST_FEATURE_FLAG };
-set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { TMP_TEST_FEATURE_FLAG };
+set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
+set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
 
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 6ba184d..9fdaadf 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -30,9 +30,9 @@
 #include "kudu/util/trace.h"
 
 using google::protobuf::FieldDescriptor;
+using google::protobuf::io::CodedOutputStream;
 using google::protobuf::Message;
 using google::protobuf::MessageLite;
-using google::protobuf::io::CodedOutputStream;
 using std::shared_ptr;
 using std::vector;
 using strings::Substitute;
@@ -42,7 +42,6 @@ DEFINE_bool(rpc_dump_all_traces, false,
 TAG_FLAG(rpc_dump_all_traces, advanced);
 TAG_FLAG(rpc_dump_all_traces, runtime);
 
-
 namespace kudu {
 namespace rpc {
 
@@ -80,6 +79,18 @@ void InboundCall::RespondSuccess(const MessageLite& response) {
   Respond(response, true);
 }
 
+void InboundCall::RespondUnsupportedFeature(const vector<uint32_t>& unsupported_features) {
+  TRACE_EVENT0("rpc", "InboundCall::RespondUnsupportedFeature");
+  ErrorStatusPB err;
+  err.set_message("unsupported feature flags");
+  err.set_code(ErrorStatusPB::ERROR_INVALID_REQUEST);
+  for (uint32_t feature : unsupported_features) {
+    err.add_unsupported_feature_flags(feature);
+  }
+
+  Respond(err, false);
+}
+
 void InboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
                                  const Status& status) {
   TRACE_EVENT0("rpc", "InboundCall::RespondFailure");
@@ -274,5 +285,13 @@ MonoTime InboundCall::GetClientDeadline() const {
   return deadline;
 }
 
+vector<uint32_t> InboundCall::GetRequiredFeatures() const {
+  vector<uint32_t> features;
+  for (uint32_t feature : header_.required_feature_flags()) {
+    features.push_back(feature);
+  }
+  return features;
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 88706d3..e15d323 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -102,6 +102,8 @@ class InboundCall {
   void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
                       const Status &status);
 
+  void RespondUnsupportedFeature(const std::vector<uint32_t>& unsupported_features);
+
   void RespondApplicationError(int error_ext_id, const std::string& message,
                                const google::protobuf::MessageLite& app_error_pb);
 
@@ -157,6 +159,10 @@ class InboundCall {
   // If the client did not specify a deadline, returns MonoTime::Max().
   MonoTime GetClientDeadline() const;
 
+  // Returns the set of application-specific feature flags required to service
+  // the RPC.
+  std::vector<uint32_t> GetRequiredFeatures() const;
+
  private:
   // Serialize and queue the response.
   void Respond(const google::protobuf::MessageLite& response,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index b63e69a..5b7fbd8 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -16,10 +16,12 @@
 // under the License.
 
 #include <algorithm>
-#include <string>
-#include <vector>
 #include <boost/functional/hash.hpp>
 #include <gflags/gflags.h>
+#include <set>
+#include <string>
+#include <unordered_set>
+#include <vector>
 
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
@@ -35,9 +37,10 @@
 namespace kudu {
 namespace rpc {
 
-using strings::Substitute;
-using google::protobuf::Message;
 using google::protobuf::io::CodedOutputStream;
+using google::protobuf::Message;
+using std::set;
+using strings::Substitute;
 
 static const double kMicrosPerSecond = 1000000.0;
 
@@ -89,6 +92,10 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) {
     header_.set_timeout_millis(timeout.ToMilliseconds());
   }
 
+  for (uint32_t feature : controller_->required_server_features()) {
+    header_.add_required_feature_flags(feature);
+  }
+
   CHECK_OK(serialization::SerializeHeader(header_, param_len, &header_buf_));
 
   // Return the concatenated packet.
@@ -97,6 +104,14 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) {
   return Status::OK();
 }
 
+set<RpcFeatureFlag> OutboundCall::RequiredRpcFeatures() const {
+  set<RpcFeatureFlag> s;
+  if (!controller_->required_server_features().empty()) {
+    s.insert(RpcFeatureFlag::APPLICATION_FEATURE_FLAGS);
+  }
+  return s;
+}
+
 Status OutboundCall::SetRequestParam(const Message& message) {
   return serialization::SerializeMessage(message, &request_buf_);
 }
@@ -111,7 +126,6 @@ const ErrorStatusPB* OutboundCall::error_pb() const {
   return error_pb_.get();
 }
 
-
 string OutboundCall::StateName(State state) {
   switch (state) {
     case READY:

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 2155752..3e65e52 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_RPC_CLIENT_CALL_H
 #define KUDU_RPC_CLIENT_CALL_H
 
+#include <set>
 #include <string>
 #include <vector>
 
@@ -205,6 +206,8 @@ class OutboundCall {
   // Fill in the call response.
   void SetResponse(gscoped_ptr<CallResponse> resp);
 
+  std::set<RpcFeatureFlag> RequiredRpcFeatures() const;
+
   std::string ToString() const;
 
   void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 0e0df7d..c629720 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -44,14 +44,15 @@
 
 namespace kudu { namespace rpc {
 
-using kudu::rpc_test::AddRequestPartialPB;
 using kudu::rpc_test::AddRequestPB;
+using kudu::rpc_test::AddRequestPartialPB;
 using kudu::rpc_test::AddResponsePB;
 using kudu::rpc_test::CalculatorError;
 using kudu::rpc_test::CalculatorServiceIf;
 using kudu::rpc_test::CalculatorServiceProxy;
 using kudu::rpc_test::EchoRequestPB;
 using kudu::rpc_test::EchoResponsePB;
+using kudu::rpc_test::FeatureFlags;
 using kudu::rpc_test::PanicRequestPB;
 using kudu::rpc_test::PanicResponsePB;
 using kudu::rpc_test::SendTwoStringsRequestPB;
@@ -83,7 +84,7 @@ class GenericCalculatorService : public ServiceIf {
     // this test doesn't generate metrics, so we ignore the argument.
   }
 
-  virtual void Handle(InboundCall *incoming) OVERRIDE {
+  void Handle(InboundCall *incoming) override {
     if (incoming->remote_method().method_name() == kAddMethodName) {
       DoAdd(incoming);
     } else if (incoming->remote_method().method_name() == kSleepMethodName) {
@@ -96,7 +97,7 @@ class GenericCalculatorService : public ServiceIf {
     }
   }
 
-  std::string service_name() const OVERRIDE { return kFullServiceName; }
+  std::string service_name() const override { return kFullServiceName; }
   static std::string static_service_name() { return kFullServiceName; }
 
  private:
@@ -164,16 +165,12 @@ class CalculatorService : public CalculatorServiceIf {
     : CalculatorServiceIf(entity) {
   }
 
-  virtual void Add(const AddRequestPB *req,
-                   AddResponsePB *resp,
-                   RpcContext *context) OVERRIDE {
+  void Add(const AddRequestPB *req, AddResponsePB *resp, RpcContext *context) override {
     resp->set_result(req->x() + req->y());
     context->RespondSuccess();
   }
 
-  virtual void Sleep(const SleepRequestPB *req,
-                     SleepResponsePB *resp,
-                     RpcContext *context) OVERRIDE {
+  void Sleep(const SleepRequestPB *req, SleepResponsePB *resp, RpcContext *context) override {
     if (req->return_app_error()) {
       CalculatorError my_error;
       my_error.set_extra_error_data("some application-specific error data");
@@ -206,16 +203,12 @@ class CalculatorService : public CalculatorServiceIf {
     DoSleep(req, context);
   }
 
-  virtual void Echo(const EchoRequestPB *req,
-                    EchoResponsePB *resp,
-                    RpcContext *context) OVERRIDE {
+  void Echo(const EchoRequestPB *req, EchoResponsePB *resp, RpcContext *context) override {
     resp->set_data(req->data());
     context->RespondSuccess();
   }
 
-  virtual void WhoAmI(const WhoAmIRequestPB* req,
-                      WhoAmIResponsePB* resp,
-                      RpcContext* context) OVERRIDE {
+  void WhoAmI(const WhoAmIRequestPB* req, WhoAmIResponsePB* resp, RpcContext* context) override {
     const UserCredentials& creds = context->user_credentials();
     if (creds.has_effective_user()) {
       resp->mutable_credentials()->set_effective_user(creds.effective_user());
@@ -225,19 +218,21 @@ class CalculatorService : public CalculatorServiceIf {
     context->RespondSuccess();
   }
 
-  virtual void TestArgumentsInDiffPackage(const ReqDiffPackagePB *req,
-                                          RespDiffPackagePB *resp,
-                                          ::kudu::rpc::RpcContext *context) OVERRIDE {
+  void TestArgumentsInDiffPackage(const ReqDiffPackagePB *req,
+                                  RespDiffPackagePB *resp,
+                                  ::kudu::rpc::RpcContext *context) override {
     context->RespondSuccess();
   }
 
-  virtual void Panic(const PanicRequestPB* req,
-                     PanicResponsePB* resp,
-                     RpcContext* context) OVERRIDE {
+  void Panic(const PanicRequestPB* req, PanicResponsePB* resp, RpcContext* context) override {
     TRACE("Got panic request");
     PANIC_RPC(context, "Test method panicking!");
   }
 
+  bool SupportsFeature(uint32_t feature) const override {
+    return feature == FeatureFlags::FOO;
+  }
+
  private:
   void DoSleep(const SleepRequestPB *req,
                RpcContext *context) {
@@ -266,11 +261,11 @@ class RpcTestBase : public KuduTest {
       metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "test.rpc_test")) {
   }
 
-  virtual void SetUp() OVERRIDE {
+  void SetUp() override {
     KuduTest::SetUp();
   }
 
-  virtual void TearDown() OVERRIDE {
+  void TearDown() override {
     if (service_pool_) {
       server_messenger_->UnregisterService(service_name_);
       service_pool_->Shutdown();
@@ -421,7 +416,6 @@ class RpcTestBase : public KuduTest {
   scoped_refptr<MetricEntity> metric_entity_;
 };
 
-
 } // namespace rpc
 } // namespace kudu
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 03928af..e1d2c5b 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -26,9 +26,11 @@
 
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
+#include "kudu/rpc/constants.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_util.h"
 
 METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
@@ -511,5 +513,80 @@ TEST_F(TestRpc, TestRpcContextClientDeadline) {
   ASSERT_OK(p.SyncRequest("Sleep", req, &resp, &controller));
 }
 
+// Test that setting an call-level application feature flag to an unknown value
+// will make the server reject the call.
+TEST_F(TestRpc, TestApplicationFeatureFlag) {
+  // Set up server.
+  Sockaddr server_addr;
+  StartTestServerWithGeneratedCode(&server_addr);
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
+  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+
+  { // Supported flag
+    AddRequestPB req;
+    req.set_x(1);
+    req.set_y(2);
+    AddResponsePB resp;
+    RpcController controller;
+    controller.RequireServerFeature(FeatureFlags::FOO);
+    Status s = p.SyncRequest("Add", req, &resp, &controller);
+    SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
+    ASSERT_TRUE(s.ok());
+    ASSERT_EQ(resp.result(), 3);
+  }
+
+  { // Unsupported flag
+    AddRequestPB req;
+    req.set_x(1);
+    req.set_y(2);
+    AddResponsePB resp;
+    RpcController controller;
+    controller.RequireServerFeature(FeatureFlags::FOO);
+    controller.RequireServerFeature(99);
+    Status s = p.SyncRequest("Add", req, &resp, &controller);
+    SCOPED_TRACE(strings::Substitute("unsupported response: $0", s.ToString()));
+    ASSERT_TRUE(s.IsRemoteError());
+  }
+}
+
+TEST_F(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
+  auto savedFlags = kSupportedServerRpcFeatureFlags;
+  auto cleanup = MakeScopedCleanup([&] () { kSupportedServerRpcFeatureFlags = savedFlags; });
+  kSupportedServerRpcFeatureFlags = {};
+
+  // Set up server.
+  Sockaddr server_addr;
+  StartTestServerWithGeneratedCode(&server_addr);
+
+  // Set up client.
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
+  Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
+
+  { // Required flag
+    AddRequestPB req;
+    req.set_x(1);
+    req.set_y(2);
+    AddResponsePB resp;
+    RpcController controller;
+    controller.RequireServerFeature(FeatureFlags::FOO);
+    Status s = p.SyncRequest("Add", req, &resp, &controller);
+    SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
+    ASSERT_TRUE(s.IsNotSupported());
+  }
+
+  { // No required flag
+    AddRequestPB req;
+    req.set_x(1);
+    req.set_y(2);
+    AddResponsePB resp;
+    RpcController controller;
+    Status s = p.SyncRequest("Add", req, &resp, &controller);
+    SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
+    ASSERT_TRUE(s.ok());
+  }
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index 53f76d9..7236b0a 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -87,6 +87,11 @@ void RpcController::set_deadline(const MonoTime& deadline) {
   set_timeout(deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE)));
 }
 
+void RpcController::RequireServerFeature(uint32_t feature) {
+  DCHECK(!call_ || call_->state() == OutboundCall::READY);
+  required_server_features_.insert(feature);
+}
+
 MonoDelta RpcController::timeout() const {
   lock_guard<simple_spinlock> l(&lock_);
   return timeout_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index bea703b..5abafc2 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -17,8 +17,10 @@
 #ifndef KUDU_RPC_RPC_CONTROLLER_H
 #define KUDU_RPC_RPC_CONTROLLER_H
 
+#include <functional>
 #include <glog/logging.h>
 #include <memory>
+#include <unordered_set>
 
 #include "kudu/gutil/macros.h"
 #include "kudu/util/locks.h"
@@ -102,6 +104,54 @@ class RpcController {
   // Using an uninitialized deadline means the call won't time out.
   void set_deadline(const MonoTime& deadline);
 
+  // Add a requirement that the server side must support a feature with the
+  // given identifier. The set of required features is sent to the server
+  // with the RPC call, and if any required feature is not supported, the
+  // call will fail with a NotSupported() status.
+  //
+  // This can be used when an RPC call changes in a way that is protobuf-compatible,
+  // but for which it would not be appropriate for the server to simply ignore
+  // an added field. For example, consider an API call like:
+  //
+  //   message DeleteAccount {
+  //     optional string username = 1;
+  //     optional bool dry_run = 2; // ADDED LATER!
+  //   }
+  //
+  // In this case, if a new client which supports the 'dry_run' flag sends the RPC
+  // to an old server, the old server will simply ignore the unrecognized parameter,
+  // with highly problematic results. To solve this problem, the new version can
+  // add a feature flag:
+  //
+  //   In .proto file
+  //   ----------------
+  //   enum MyFeatureFlags {
+  //     UNKNOWN = 0;
+  //     DELETE_ACCOUNT_SUPPORTS_DRY_RUN = 1;
+  //   }
+  //
+  //   In client code:
+  //   ---------------
+  //   if (dry_run) {
+  //     rpc.RequireServerFeature(DELETE_ACCOUNT_SUPPORTS_DRY_RUN);
+  //     req.set_dry_run(true);
+  //   }
+  //
+  // This has the effect of (a) maintaining compatibility when dry_run is not specified
+  // and (b) rejecting the RPC with a "NotSupported" error when it is.
+  //
+  // NOTE: 'feature' is an int rather than an enum type because each service
+  // must define its own enum of supported features, and protobuf doesn't support
+  // any ability to 'extend' enum types. Implementers should define an enum in the
+  // service's protobuf definition as shown above.
+  void RequireServerFeature(uint32_t feature);
+
+  // Executes the provided function with a reference to the required server
+  // features.
+  const std::unordered_set<uint32_t>& required_server_features() const {
+    return required_server_features_;
+  }
+
   // Return the configured timeout.
   MonoDelta timeout() const;
 
@@ -119,6 +169,7 @@ class RpcController {
   friend class Proxy;
 
   MonoDelta timeout_;
+  std::unordered_set<uint32_t> required_server_features_;
 
   mutable simple_spinlock lock_;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 100a57d..b1fabc3 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -21,7 +21,6 @@ package kudu.rpc;
 
 option java_package = "org.kududb.rpc";
 
-
 // The Kudu RPC protocol is similar to the RPC protocol of Hadoop and HBase.
 // See the following for reference on those other protocols:
 //  - https://issues.apache.org/jira/browse/HBASE-7898
@@ -61,9 +60,9 @@ message ConnectionContextPB {
 enum RpcFeatureFlag {
   UNKNOWN = 0;
 
-  // TODO: as soon as we have any real feature flag, we can get rid of this
-  // feature flag. For now, we just need something to assert on for tests.
-  TMP_TEST_FEATURE_FLAG = 1000;
+  // The RPC system is required to support application feature flags in the
+  // request and response headers.
+  APPLICATION_FEATURE_FLAGS = 1;
 };
 
 // Message type passed back & forth for the SASL negotiation.
@@ -130,6 +129,17 @@ message RequestHeader {
   // transit time between the client and server, if you wait exactly this amount of
   // time and then respond, you are likely to cause a timeout on the client.
   optional uint32 timeout_millis = 10;
+
+  // Feature flags that the service must support in order to properly interpret this
+  // request. The client can pass any set of flags, and if the server doesn't
+  // support any of them, then it will fail the request.
+  //
+  // NOTE: these are for evolving features at the level of the application, not
+  // the RPC framework. Hence, we have to use a generic int type rather than a
+  // particular enum.
+  // NOTE: the server will only interpret this field if it supports the
+  // APPLICATION_FEATURE_FLAGS flag.
+  repeated uint32 required_feature_flags = 11;
 }
 
 message ResponseHeader {
@@ -169,7 +179,8 @@ message ErrorStatusPB {
     // The server is overloaded - the client should try again shortly.
     ERROR_SERVER_TOO_BUSY = 4;
 
-    // The request parameter was not parseable or was missing required fields.
+    // The request parameter was not parseable, was missing required fields,
+    // or the server does not support the required feature flags.
     ERROR_INVALID_REQUEST = 5;
 
     // FATAL_* errors indicate that the client should shut down the connection.
@@ -191,6 +202,10 @@ message ErrorStatusPB {
   // TODO: Make code required?
   optional RpcErrorCodePB code = 2;  // Specific error identifier.
 
+  // If the request is failed due to an unsupported feature flag, the particular
+  // flag(s) that were not supported will be sent back to the client.
+  repeated uint32 unsupported_feature_flags = 3;
+
   // Allow extensions. When the RPC returns ERROR_APPLICATION, the server
   // should also fill in exactly one of these extension fields, which contains
   // more details on the service-specific error.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto
index f1e3e93..2c89877 100644
--- a/src/kudu/rpc/rtest.proto
+++ b/src/kudu/rpc/rtest.proto
@@ -90,6 +90,11 @@ message CalculatorError {
 message PanicRequestPB {}
 message PanicResponsePB {}
 
+enum FeatureFlags {
+  UNKNOWN=0;
+  FOO=1;
+}
+
 service CalculatorService {
   rpc Add(AddRequestPB) returns(AddResponsePB);
   rpc Sleep(SleepRequestPB) returns(SleepResponsePB);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/sasl_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_rpc-test.cc b/src/kudu/rpc/sasl_rpc-test.cc
index 25ea80d..1a2d2e8 100644
--- a/src/kudu/rpc/sasl_rpc-test.cc
+++ b/src/kudu/rpc/sasl_rpc-test.cc
@@ -118,7 +118,7 @@ static void RunPlainNegotiationServer(Socket* conn) {
   CHECK_OK(sasl_server.Init(kSaslAppName));
   CHECK_OK(sasl_server.EnablePlain(std::move(authstore)));
   CHECK_OK(sasl_server.Negotiate());
-  CHECK(ContainsKey(sasl_server.client_features(), TMP_TEST_FEATURE_FLAG));
+  CHECK(ContainsKey(sasl_server.client_features(), APPLICATION_FEATURE_FLAGS));
 }
 
 static void RunPlainNegotiationClient(Socket* conn) {
@@ -126,7 +126,7 @@ static void RunPlainNegotiationClient(Socket* conn) {
   CHECK_OK(sasl_client.Init(kSaslAppName));
   CHECK_OK(sasl_client.EnablePlain("danger", "burrito"));
   CHECK_OK(sasl_client.Negotiate());
-  CHECK(ContainsKey(sasl_client.server_features(), TMP_TEST_FEATURE_FLAG));
+  CHECK(ContainsKey(sasl_client.server_features(), APPLICATION_FEATURE_FLAGS));
 }
 
 // Test SASL negotiation using the PLAIN mechanism over a socket.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/service_if.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc
index 7377cea..bbc9a2d 100644
--- a/src/kudu/rpc/service_if.cc
+++ b/src/kudu/rpc/service_if.cc
@@ -44,6 +44,10 @@ ServiceIf::~ServiceIf() {
 void ServiceIf::Shutdown() {
 }
 
+bool ServiceIf::SupportsFeature(uint32_t feature) const {
+  return false;
+}
+
 bool ServiceIf::ParseParam(InboundCall *call, google::protobuf::Message *message) {
   Slice param(call->serialized_request());
   if (PREDICT_FALSE(!message->ParseFromArray(param.data(), param.size()))) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/service_if.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_if.h b/src/kudu/rpc/service_if.h
index 0224420..83f49dd 100644
--- a/src/kudu/rpc/service_if.h
+++ b/src/kudu/rpc/service_if.h
@@ -53,6 +53,10 @@ class ServiceIf {
   virtual void Shutdown();
   virtual std::string service_name() const = 0;
 
+  // The service should return true if it supports the provided application
+  // specific feature flag.
+  virtual bool SupportsFeature(uint32_t feature) const;
+
  protected:
   bool ParseParam(InboundCall* call, google::protobuf::Message* message);
   void RespondBadMethod(InboundCall* call);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/service_pool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_pool.cc b/src/kudu/rpc/service_pool.cc
index 55a2791..eb7d46a 100644
--- a/src/kudu/rpc/service_pool.cc
+++ b/src/kudu/rpc/service_pool.cc
@@ -106,6 +106,18 @@ void ServicePool::Shutdown() {
 Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
   InboundCall* c = call.release();
 
+  vector<uint32_t> unsupported_features;
+  for (uint32_t feature : c->GetRequiredFeatures()) {
+    if (!service_->SupportsFeature(feature)) {
+      unsupported_features.push_back(feature);
+    }
+  }
+
+  if (!unsupported_features.empty()) {
+    c->RespondUnsupportedFeature(unsupported_features);
+    return Status::NotSupported("call requires unsupported application feature flags");
+  }
+
   TRACE_TO(c->trace(), "Inserting onto call queue");
   // Queue message on service queue
   QueueStatus queue_status = service_queue_.Put(c);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index b1807ae..553de5f 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -42,6 +42,7 @@ namespace kudu {
 namespace rpc {
 
 using std::ostringstream;
+using std::set;
 using std::string;
 
 #define RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status) \
@@ -122,11 +123,15 @@ string InboundTransfer::StatusAsString() const {
   return strings::Substitute("$0/$1 bytes received", cur_offset_, total_length_);
 }
 
-OutboundTransfer::OutboundTransfer(const std::vector<Slice> &payload,
+OutboundTransfer::OutboundTransfer(int32_t call_id,
+                                   const std::vector<Slice> &payload,
+                                   set<RpcFeatureFlag> required_features,
                                    TransferCallbacks *callbacks)
   : cur_slice_idx_(0),
     cur_offset_in_slice_(0),
+    required_features_(std::move(required_features)),
     callbacks_(callbacks),
+    call_id_(call_id),
     aborted_(false) {
   CHECK(!payload.empty());
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index cc66b4e..662d3e4 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -18,10 +18,11 @@
 #ifndef KUDU_RPC_TRANSFER_H
 #define KUDU_RPC_TRANSFER_H
 
-#include <boost/intrusive/list.hpp>
 #include <boost/function.hpp>
+#include <boost/intrusive/list.hpp>
 #include <boost/utility.hpp>
 #include <gflags/gflags.h>
+#include <set>
 #include <stdint.h>
 #include <string>
 #include <vector>
@@ -87,7 +88,6 @@ class InboundTransfer {
   DISALLOW_COPY_AND_ASSIGN(InboundTransfer);
 };
 
-
 // When the connection wants to send data, it creates an OutboundTransfer object
 // to encompass it. This sits on a queue within the Connection, so that each time
 // the Connection wakes up with a writable socket, it consumes more bytes off
@@ -106,9 +106,17 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
   // memory of the slices. The slices must remain valid until the callback
   // is triggered.
   //
+  // When the transfer starts, the required features are checked against the set
+  // of features which the server supports. The check is delayed until just
+  // before the transfer starts because it depends on the negotiation with the
+  // server being complete. The call_id allows the call to be canceled if
+  // required features are not supported.
+  //
   // NOTE: 'payload' is currently restricted to a maximum of kMaxPayloadSlices
   // slices.
-  OutboundTransfer(const std::vector<Slice> &payload,
+  OutboundTransfer(int32_t call_id,
+                   const std::vector<Slice> &payload,
+                   std::set<RpcFeatureFlag> required_features,
                    TransferCallbacks *callbacks);
 
   // Destruct the transfer. A transfer object should never be deallocated
@@ -128,11 +136,19 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
   // Return true if the entire transfer has been sent.
   bool TransferFinished() const;
 
+  const std::set<RpcFeatureFlag>& required_features() const {
+    return required_features_;
+  }
+
   // Return the total number of bytes to be sent (including those already sent)
   int32_t TotalLength() const;
 
   std::string HexDump() const;
 
+  int32_t call_id() const {
+    return call_id_;
+  }
+
  private:
   // Slices to send. Uses an array here instead of a vector to avoid an expensive
   // vector construction (improved performance a couple percent).
@@ -144,8 +160,12 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
   // The number of bytes in the above slice which has already been sent.
   int32_t cur_offset_in_slice_;
 
+  std::set<RpcFeatureFlag> required_features_;
+
   TransferCallbacks *callbacks_;
 
+  int32_t call_id_;
+
   bool aborted_;
 
   DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index db94115..241f6f6 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -279,10 +279,10 @@ ADD_KUDU_TEST(jsonreader-test)
 ADD_KUDU_TEST(knapsack_solver-test)
 ADD_KUDU_TEST(logging-test)
 ADD_KUDU_TEST(map-util-test)
+ADD_KUDU_TEST(mem_tracker-test)
 ADD_KUDU_TEST(memcmpable_varint-test LABELS no_tsan)
 ADD_KUDU_TEST(memenv/memenv-test)
 ADD_KUDU_TEST(memory/arena-test)
-ADD_KUDU_TEST(mem_tracker-test)
 ADD_KUDU_TEST(metrics-test)
 ADD_KUDU_TEST(monotime-test)
 ADD_KUDU_TEST(mt-hdr_histogram-test RUN_SERIAL true)
@@ -303,6 +303,7 @@ ADD_KUDU_TEST(rolling_log-test)
 ADD_KUDU_TEST(rw_semaphore-test)
 ADD_KUDU_TEST(rwc_lock-test)
 ADD_KUDU_TEST(safe_math-test)
+ADD_KUDU_TEST(scoped_cleanup-test)
 ADD_KUDU_TEST(slice-test)
 ADD_KUDU_TEST(spinlock_profiling-test)
 ADD_KUDU_TEST(stack_watchdog-test)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/util/scoped_cleanup-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/scoped_cleanup-test.cc b/src/kudu/util/scoped_cleanup-test.cc
new file mode 100644
index 0000000..790df39
--- /dev/null
+++ b/src/kudu/util/scoped_cleanup-test.cc
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/scoped_cleanup.h"
+
+#include <gtest/gtest.h>
+
+namespace kudu {
+
+TEST(ScopedCleanup, TestCleanup) {
+  int var = 0;
+  {
+    auto saved = var;
+    auto cleanup = MakeScopedCleanup([&] () { var = saved; });
+    var = 42;
+  }
+  ASSERT_EQ(var, 0);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/f82919ed/src/kudu/util/scoped_cleanup.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/scoped_cleanup.h b/src/kudu/util/scoped_cleanup.h
new file mode 100644
index 0000000..a74422c
--- /dev/null
+++ b/src/kudu/util/scoped_cleanup.h
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <utility>
+
+namespace kudu {
+
+// A scoped object which runs a cleanup function when going out of scope. Can
+// be used for scoped resource cleanup.
+template<typename F>
+class ScopedCleanup {
+ public:
+  explicit ScopedCleanup(F f) : f_(std::move(f)) {}
+  ~ScopedCleanup() { f_(); }
+ private:
+  F f_;
+};
+
+// Creates a new scoped cleanup instance with the provided function.
+template<typename F>
+ScopedCleanup<F> MakeScopedCleanup(F f) {
+  return ScopedCleanup<F>(f);
+}
+} // namespace kudu