You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:43 UTC

[31/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/tls_socket.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/tls_socket.h b/be/src/kudu/security/tls_socket.h
new file mode 100644
index 0000000..a0d716c
--- /dev/null
+++ b/be/src/kudu/security/tls_socket.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+
+#include "kudu/gutil/port.h"
+#include "kudu/security/openssl_util.h" // IWYU pragma: keep
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+struct iovec;
+typedef struct ssl_st SSL;
+
+namespace kudu {
+namespace security {
+
+class TlsSocket : public Socket {
+ public:
+
+  ~TlsSocket() override;
+
+  Status Write(const uint8_t *buf, int32_t amt, int32_t *nwritten) override WARN_UNUSED_RESULT;
+
+  Status Writev(const struct ::iovec *iov,
+                int iov_len,
+                int64_t *nwritten) override WARN_UNUSED_RESULT;
+
+  Status Recv(uint8_t *buf, int32_t amt, int32_t *nread) override WARN_UNUSED_RESULT;
+
+  Status Close() override WARN_UNUSED_RESULT;
+
+ private:
+
+  friend class TlsHandshake;
+
+  TlsSocket(int fd, c_unique_ptr<SSL> ssl);
+
+  // Owned SSL handle.
+  c_unique_ptr<SSL> ssl_;
+};
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/token-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/token-test.cc b/be/src/kudu/security/token-test.cc
new file mode 100644
index 0000000..7172a51
--- /dev/null
+++ b/be/src/kudu/security/token-test.cc
@@ -0,0 +1,677 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/walltime.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int32(tsk_num_rsa_bits);
+
+using std::string;
+using std::make_shared;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace security {
+
+namespace {
+
+SignedTokenPB MakeUnsignedToken(int64_t expiration) {
+  SignedTokenPB ret;
+  TokenPB token;
+  token.set_expire_unix_epoch_seconds(expiration);
+  CHECK(token.SerializeToString(ret.mutable_token_data()));
+  return ret;
+}
+
+SignedTokenPB MakeIncompatibleToken() {
+  SignedTokenPB ret;
+  TokenPB token;
+  token.set_expire_unix_epoch_seconds(WallTime_Now() + 100);
+  token.add_incompatible_features(TokenPB::Feature_MAX + 1);
+  CHECK(token.SerializeToString(ret.mutable_token_data()));
+  return ret;
+}
+
+// Generate public key as a string in DER format for tests.
+Status GeneratePublicKeyStrDer(string* ret) {
+  PrivateKey private_key;
+  RETURN_NOT_OK(GeneratePrivateKey(512, &private_key));
+  PublicKey public_key;
+  RETURN_NOT_OK(private_key.GetPublicKey(&public_key));
+  string public_key_str_der;
+  RETURN_NOT_OK(public_key.ToString(&public_key_str_der, DataFormat::DER));
+  *ret = public_key_str_der;
+  return Status::OK();
+}
+
+// Generate token signing key with the specified parameters.
+Status GenerateTokenSigningKey(int64_t seq_num,
+                               int64_t expire_time_seconds,
+                               unique_ptr<TokenSigningPrivateKey>* tsk) {
+  {
+    unique_ptr<PrivateKey> private_key(new PrivateKey);
+    RETURN_NOT_OK(GeneratePrivateKey(512, private_key.get()));
+    tsk->reset(new TokenSigningPrivateKey(
+        seq_num, expire_time_seconds, std::move(private_key)));
+  }
+  return Status::OK();
+}
+
+void CheckAndAddNextKey(int iter_num,
+                        TokenSigner* signer,
+                        int64_t* key_seq_num) {
+  ASSERT_NE(nullptr, signer);
+  ASSERT_NE(nullptr, key_seq_num);
+  int64_t seq_num;
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer->CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    seq_num = key->key_seq_num();
+  }
+
+  for (int i = 0; i < iter_num; ++i) {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer->CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_EQ(seq_num, key->key_seq_num());
+    if (i + 1 == iter_num) {
+      // Finally, add the key to the TokenSigner.
+      ASSERT_OK(signer->AddKey(std::move(key)));
+    }
+  }
+  *key_seq_num = seq_num;
+}
+
+} // anonymous namespace
+
+class TokenTest : public KuduTest {
+};
+
+TEST_F(TokenTest, TestInit) {
+  TokenSigner signer(10, 10);
+  const TokenVerifier& verifier(signer.verifier());
+
+  SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
+  Status s = signer.SignToken(&token);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  static const int64_t kKeySeqNum = 100;
+  PrivateKey private_key;
+  ASSERT_OK(GeneratePrivateKey(512, &private_key));
+  string private_key_str_der;
+  ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
+  TokenSigningPrivateKeyPB pb;
+  pb.set_rsa_key_der(private_key_str_der);
+  pb.set_key_seq_num(kKeySeqNum);
+  pb.set_expire_unix_epoch_seconds(WallTime_Now() + 120);
+
+  ASSERT_OK(signer.ImportKeys({pb}));
+  vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
+  ASSERT_EQ(1, public_keys.size());
+  ASSERT_EQ(kKeySeqNum, public_keys[0].key_seq_num());
+
+  // It should be possible to sign tokens once the signer is initialized.
+  ASSERT_OK(signer.SignToken(&token));
+  ASSERT_TRUE(token.has_signature());
+}
+
+// Verify that TokenSigner does not allow 'holes' in the sequence numbers
+// of the generated keys. The idea is to not allow sequences like '1, 5, 6'.
+// In general, calling the CheckNeedKey() method multiple times and then calling
+// the AddKey() method once should advance the key sequence number only by 1
+// regardless of number CheckNeedKey() calls.
+//
+// This is to make sure that the sequence numbers are not sparse in case if
+// running scenarios CheckNeedKey()-try-to-store-key-AddKey() over and over
+// again, given that the 'try-to-store-key' part can fail sometimes.
+TEST_F(TokenTest, TestTokenSignerNonSparseSequenceNumbers) {
+  static const int kIterNum = 3;
+  static const int64_t kAuthnTokenValiditySeconds = 1;
+  static const int64_t kKeyRotationSeconds = 1;
+
+  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+
+  int64_t seq_num_first_key;
+  NO_FATALS(CheckAndAddNextKey(kIterNum, &signer, &seq_num_first_key));
+
+  SleepFor(MonoDelta::FromSeconds(kKeyRotationSeconds + 1));
+
+  int64_t seq_num_second_key;
+  NO_FATALS(CheckAndAddNextKey(kIterNum, &signer, &seq_num_second_key));
+
+  ASSERT_EQ(seq_num_first_key + 1, seq_num_second_key);
+}
+
+// Verify the behavior of the TokenSigner::ImportKeys() method. In general,
+// it should tolerate mix of expired and non-expired keys, even if their
+// sequence numbers are intermixed: keys with greater sequence numbers could
+// be already expired but keys with lesser sequence numbers could be still
+// valid. The idea is to correctly import TSKs generated with different
+// validity period settings. This is to address scenarios when the system
+// was run with long authn token validity interval and then switched to
+// a shorter one.
+//
+// After importing keys, the TokenSigner should contain only the valid ones.
+// In addition, the sequence number of the very first key generated after the
+// import should be greater than any sequence number the TokenSigner has seen
+// during the import.
+TEST_F(TokenTest, TestTokenSignerAddKeyAfterImport) {
+  static const int64_t kAuthnTokenValiditySeconds = 8;
+  static const int64_t kKeyRotationSeconds = 8;
+  static const int64_t kKeyValiditySeconds =
+      kAuthnTokenValiditySeconds + 2 * kKeyRotationSeconds;
+
+  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+  const TokenVerifier& verifier(signer.verifier());
+
+  static const int64_t kExpiredKeySeqNum = 100;
+  static const int64_t kKeySeqNum = kExpiredKeySeqNum - 1;
+  {
+    // First, try to import already expired key to check that internal key
+    // sequence number advances correspondingly.
+    PrivateKey private_key;
+    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    string private_key_str_der;
+    ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
+    TokenSigningPrivateKeyPB pb;
+    pb.set_rsa_key_der(private_key_str_der);
+    pb.set_key_seq_num(kExpiredKeySeqNum);
+    pb.set_expire_unix_epoch_seconds(WallTime_Now() - 1);
+
+    ASSERT_OK(signer.ImportKeys({pb}));
+  }
+
+  {
+    // Check the result of importing keys: there should be no keys because
+    // the only one we tried to import was already expired.
+    vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
+    ASSERT_TRUE(public_keys.empty());
+  }
+
+  {
+    // Now import valid (not yet expired) key, but with sequence number less
+    // than of the expired key.
+    PrivateKey private_key;
+    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    string private_key_str_der;
+    ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
+    TokenSigningPrivateKeyPB pb;
+    pb.set_rsa_key_der(private_key_str_der);
+    pb.set_key_seq_num(kKeySeqNum);
+    // Set the TSK's expiration time: make the key valid but past its activity
+    // interval.
+    pb.set_expire_unix_epoch_seconds(
+        WallTime_Now() + (kKeyValiditySeconds - 2 * kKeyRotationSeconds - 1));
+
+    ASSERT_OK(signer.ImportKeys({pb}));
+  }
+
+  {
+    // Check the result of importing keys.
+    vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
+    ASSERT_EQ(1, public_keys.size());
+    ASSERT_EQ(kKeySeqNum, public_keys[0].key_seq_num());
+  }
+
+  {
+    // The newly imported key should be used to sign tokens.
+    SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
+    ASSERT_OK(signer.SignToken(&token));
+    ASSERT_TRUE(token.has_signature());
+    ASSERT_TRUE(token.has_signing_key_seq_num());
+    EXPECT_EQ(kKeySeqNum, token.signing_key_seq_num());
+  }
+
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_EQ(kExpiredKeySeqNum + 1, key->key_seq_num());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+    bool has_rotated = false;
+    ASSERT_OK(signer.TryRotateKey(&has_rotated));
+    ASSERT_TRUE(has_rotated);
+  }
+  {
+    // Check the result of generating the new key: the identifier of the new key
+    // should be +1 increment from the identifier of the expired imported key.
+    vector<TokenSigningPublicKeyPB> public_keys(verifier.ExportKeys());
+    ASSERT_EQ(2, public_keys.size());
+    EXPECT_EQ(kKeySeqNum, public_keys[0].key_seq_num());
+    EXPECT_EQ(kExpiredKeySeqNum + 1, public_keys[1].key_seq_num());
+  }
+
+  // At this point the new key should be used to sign tokens.
+  SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
+  ASSERT_OK(signer.SignToken(&token));
+  ASSERT_TRUE(token.has_signature());
+  ASSERT_TRUE(token.has_signing_key_seq_num());
+  EXPECT_EQ(kExpiredKeySeqNum + 1, token.signing_key_seq_num());
+}
+
+// The AddKey() method should not allow to add a key with the sequence number
+// less or equal to the sequence number of the most 'recent' key.
+TEST_F(TokenTest, TestAddKeyConstraints) {
+  {
+    TokenSigner signer(1, 1);
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+  {
+    TokenSigner signer(1, 1);
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    const int64_t key_seq_num = key->key_seq_num();
+    key->key_seq_num_ = key_seq_num - 1;
+    Status s = signer.AddKey(std::move(key));
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        ": invalid key sequence number, should be at least ");
+  }
+  {
+    TokenSigner signer(1, 1);
+    static const int64_t kKeySeqNum = 100;
+    PrivateKey private_key;
+    ASSERT_OK(GeneratePrivateKey(512, &private_key));
+    string private_key_str_der;
+    ASSERT_OK(private_key.ToString(&private_key_str_der, DataFormat::DER));
+    TokenSigningPrivateKeyPB pb;
+    pb.set_rsa_key_der(private_key_str_der);
+    pb.set_key_seq_num(kKeySeqNum);
+    // Make the key already expired.
+    pb.set_expire_unix_epoch_seconds(WallTime_Now() - 1);
+    ASSERT_OK(signer.ImportKeys({pb}));
+
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    const int64_t key_seq_num = key->key_seq_num();
+    ASSERT_GT(key_seq_num, kKeySeqNum);
+    key->key_seq_num_ = kKeySeqNum;
+    Status s = signer.AddKey(std::move(key));
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        ": invalid key sequence number, should be at least ");
+  }
+}
+
+TEST_F(TokenTest, TestGenerateAuthTokenNoUserName) {
+  TokenSigner signer(10, 10);
+  SignedTokenPB signed_token_pb;
+  const Status& s = signer.GenerateAuthnToken("", &signed_token_pb);
+  EXPECT_TRUE(s.IsInvalidArgument()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "no username provided for authn token");
+}
+
+TEST_F(TokenTest, TestIsCurrentKeyValid) {
+  static const int64_t kAuthnTokenValiditySeconds = 1;
+  static const int64_t kKeyRotationSeconds = 1;
+  static const int64_t kKeyValiditySeconds =
+      kAuthnTokenValiditySeconds + 2 * kKeyRotationSeconds;
+
+  TokenSigner signer(kAuthnTokenValiditySeconds, kKeyRotationSeconds);
+  EXPECT_FALSE(signer.IsCurrentKeyValid());
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+  EXPECT_TRUE(signer.IsCurrentKeyValid());
+  SleepFor(MonoDelta::FromSeconds(kKeyValiditySeconds));
+  // The key should expire after its validity interval.
+  EXPECT_FALSE(signer.IsCurrentKeyValid());
+
+  // Anyway, current implementation allows to use an expired key to sign tokens.
+  SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
+  EXPECT_OK(signer.SignToken(&token));
+}
+
+TEST_F(TokenTest, TestTokenSignerAddKeys) {
+  {
+    TokenSigner signer(10, 10);
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // It's not time to add next key yet.
+    ASSERT_EQ(nullptr, key.get());
+  }
+
+  {
+    // Special configuration for TokenSigner: rotation interval is zero,
+    // so should be able to add two keys right away.
+    TokenSigner signer(10, 0);
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    // Should be able to add next key right away.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    // Active key and next key are already in place: no need for a new key.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_EQ(nullptr, key.get());
+  }
+
+  if (AllowSlowTests()) {
+    // Special configuration for TokenSigner: short interval for key rotation.
+    // It should not need next key right away, but should need next key after
+    // the rotation interval.
+    static const int64_t kKeyRotationIntervalSeconds = 8;
+    TokenSigner signer(10, kKeyRotationIntervalSeconds);
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    // Should not need next key right away.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_EQ(nullptr, key.get());
+
+    SleepFor(MonoDelta::FromSeconds(kKeyRotationIntervalSeconds));
+
+    // Should need next key after the rotation interval.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+
+    // Active key and next key are already in place: no need for a new key.
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_EQ(nullptr, key.get());
+  }
+}
+
+// Test how key rotation works.
+TEST_F(TokenTest, TestTokenSignerSignVerifyExport) {
+  // Key rotation interval 0 allows adding 2 keys in a row with no delay.
+  TokenSigner signer(10, 0);
+  const TokenVerifier& verifier(signer.verifier());
+
+  // Should start off with no signing keys.
+  ASSERT_TRUE(verifier.ExportKeys().empty());
+
+  // Trying to sign a token when there is no TSK should give an error.
+  SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
+  Status s = signer.SignToken(&token);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+  // Generate and set a new key.
+  int64_t signing_key_seq_num;
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    signing_key_seq_num = key->key_seq_num();
+    ASSERT_GT(signing_key_seq_num, -1);
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+
+  // We should see the key now if we request TSKs starting at a
+  // lower sequence number.
+  ASSERT_EQ(1, verifier.ExportKeys().size());
+  // We should not see the key if we ask for the sequence number
+  // that it is assigned.
+  ASSERT_EQ(0, verifier.ExportKeys(signing_key_seq_num).size());
+
+  // We should be able to sign a token now.
+  ASSERT_OK(signer.SignToken(&token));
+  ASSERT_TRUE(token.has_signature());
+  ASSERT_EQ(signing_key_seq_num, token.signing_key_seq_num());
+
+  // Set next key and check that we return the right keys.
+  int64_t next_signing_key_seq_num;
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    next_signing_key_seq_num = key->key_seq_num();
+    ASSERT_GT(next_signing_key_seq_num, signing_key_seq_num);
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+  ASSERT_EQ(2, verifier.ExportKeys().size());
+  ASSERT_EQ(1, verifier.ExportKeys(signing_key_seq_num).size());
+  ASSERT_EQ(0, verifier.ExportKeys(next_signing_key_seq_num).size());
+
+  // The first key should be used for signing: the next one is saved
+  // for the next round.
+  {
+    SignedTokenPB token = MakeUnsignedToken(WallTime_Now());
+    ASSERT_OK(signer.SignToken(&token));
+    ASSERT_TRUE(token.has_signature());
+    ASSERT_EQ(signing_key_seq_num, token.signing_key_seq_num());
+  }
+}
+
+// Test that the TokenSigner can export its public keys in protobuf form
+// via bound TokenVerifier.
+TEST_F(TokenTest, TestExportKeys) {
+  // Test that the exported public keys don't contain private key material,
+  // and have an appropriate expiration.
+  const int64_t key_exp_seconds = 30;
+  const int64_t key_rotation_seconds = 10;
+  TokenSigner signer(key_exp_seconds - 2 * key_rotation_seconds,
+                     key_rotation_seconds);
+  int64_t key_seq_num;
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    key_seq_num = key->key_seq_num();
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+  const TokenVerifier& verifier(signer.verifier());
+  auto keys = verifier.ExportKeys();
+  ASSERT_EQ(1, keys.size());
+  const TokenSigningPublicKeyPB& key = keys[0];
+  ASSERT_TRUE(key.has_rsa_key_der());
+  ASSERT_EQ(key_seq_num, key.key_seq_num());
+  ASSERT_TRUE(key.has_expire_unix_epoch_seconds());
+  const int64_t now = WallTime_Now();
+  ASSERT_GT(key.expire_unix_epoch_seconds(), now);
+  ASSERT_LE(key.expire_unix_epoch_seconds(), now + key_exp_seconds);
+}
+
+// Test that the TokenVerifier can import keys exported by the TokenSigner
+// and then verify tokens signed by it.
+TEST_F(TokenTest, TestEndToEnd_Valid) {
+  TokenSigner signer(10, 10);
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+
+  // Make and sign a token.
+  SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+  ASSERT_OK(signer.SignToken(&signed_token));
+
+  // Try to verify it.
+  TokenVerifier verifier;
+  ASSERT_OK(verifier.ImportKeys(signer.verifier().ExportKeys()));
+  TokenPB token;
+  ASSERT_EQ(VerificationResult::VALID, verifier.VerifyTokenSignature(signed_token, &token));
+}
+
+// Test all of the possible cases covered by token verification.
+// See VerificationResult.
+TEST_F(TokenTest, TestEndToEnd_InvalidCases) {
+  // Key rotation interval 0 allows adding 2 keys in a row with no delay.
+  TokenSigner signer(10, 0);
+  {
+    std::unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(signer.CheckNeedKey(&key));
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(signer.AddKey(std::move(key)));
+  }
+
+  TokenVerifier verifier;
+  ASSERT_OK(verifier.ImportKeys(signer.verifier().ExportKeys()));
+
+  // Make and sign a token, but corrupt the data in it.
+  {
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    signed_token.set_token_data("xyz");
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::INVALID_TOKEN,
+              verifier.VerifyTokenSignature(signed_token, &token));
+  }
+
+  // Make and sign a token, but corrupt the signature.
+  {
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    signed_token.set_signature("xyz");
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::INVALID_SIGNATURE,
+              verifier.VerifyTokenSignature(signed_token, &token));
+  }
+
+  // Make and sign a token, but set it to be already expired.
+  {
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() - 10);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::EXPIRED_TOKEN,
+              verifier.VerifyTokenSignature(signed_token, &token));
+  }
+
+  // Make and sign a token which uses an incompatible feature flag.
+  {
+    SignedTokenPB signed_token = MakeIncompatibleToken();
+    ASSERT_OK(signer.SignToken(&signed_token));
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::INCOMPATIBLE_FEATURE,
+              verifier.VerifyTokenSignature(signed_token, &token));
+  }
+
+  // Set a new signing key, but don't inform the verifier of it yet. When we
+  // verify, we expect the verifier to complain the key is unknown.
+  {
+    {
+      std::unique_ptr<TokenSigningPrivateKey> key;
+      ASSERT_OK(signer.CheckNeedKey(&key));
+      ASSERT_NE(nullptr, key.get());
+      ASSERT_OK(signer.AddKey(std::move(key)));
+      bool has_rotated = false;
+      ASSERT_OK(signer.TryRotateKey(&has_rotated));
+      ASSERT_TRUE(has_rotated);
+    }
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+    ASSERT_OK(signer.SignToken(&signed_token));
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::UNKNOWN_SIGNING_KEY,
+              verifier.VerifyTokenSignature(signed_token, &token));
+  }
+
+  // Set a new signing key which is already expired, and inform the verifier
+  // of all of the current keys. The verifier should recognize the key but
+  // know that it's expired.
+  {
+    {
+      unique_ptr<TokenSigningPrivateKey> tsk;
+      ASSERT_OK(GenerateTokenSigningKey(100, WallTime_Now() - 1, &tsk));
+      // This direct access is necessary because AddKey() does not allow to add
+      // an expired key.
+      TokenSigningPublicKeyPB tsk_public_pb;
+      tsk->ExportPublicKeyPB(&tsk_public_pb);
+      ASSERT_OK(verifier.ImportKeys({tsk_public_pb}));
+      signer.tsk_deque_.push_front(std::move(tsk));
+    }
+
+    SignedTokenPB signed_token = MakeUnsignedToken(WallTime_Now() + 600);
+    // Current implementation allows to use an expired key to sign tokens.
+    ASSERT_OK(signer.SignToken(&signed_token));
+    TokenPB token;
+    ASSERT_EQ(VerificationResult::EXPIRED_SIGNING_KEY,
+              verifier.VerifyTokenSignature(signed_token, &token));
+  }
+}
+
+// Test functionality of the TokenVerifier::ImportKeys() method.
+TEST_F(TokenTest, TestTokenVerifierImportKeys) {
+  TokenVerifier verifier;
+
+  // An attempt to import no keys is fine.
+  ASSERT_OK(verifier.ImportKeys({}));
+  ASSERT_TRUE(verifier.ExportKeys().empty());
+
+  TokenSigningPublicKeyPB tsk_public_pb;
+  const auto exp_time = WallTime_Now() + 600;
+  tsk_public_pb.set_key_seq_num(100500);
+  tsk_public_pb.set_expire_unix_epoch_seconds(exp_time);
+  string public_key_str_der;
+  ASSERT_OK(GeneratePublicKeyStrDer(&public_key_str_der));
+  tsk_public_pb.set_rsa_key_der(public_key_str_der);
+
+  ASSERT_OK(verifier.ImportKeys({ tsk_public_pb }));
+  {
+    const auto& exported_tsks_public_pb = verifier.ExportKeys();
+    ASSERT_EQ(1, exported_tsks_public_pb.size());
+    EXPECT_EQ(tsk_public_pb.SerializeAsString(),
+              exported_tsks_public_pb[0].SerializeAsString());
+  }
+
+  // Re-importing the same key again is fine, and the total number
+  // of exported keys should not increase.
+  ASSERT_OK(verifier.ImportKeys({ tsk_public_pb }));
+  {
+    const auto& exported_tsks_public_pb = verifier.ExportKeys();
+    ASSERT_EQ(1, exported_tsks_public_pb.size());
+    EXPECT_EQ(tsk_public_pb.SerializeAsString(),
+              exported_tsks_public_pb[0].SerializeAsString());
+  }
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/token.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/token.proto b/be/src/kudu/security/token.proto
new file mode 100644
index 0000000..e27ccdb
--- /dev/null
+++ b/be/src/kudu/security/token.proto
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu.security;
+
+option java_package = "org.apache.kudu.security";
+
+import "kudu/util/pb_util.proto";
+
+message AuthnTokenPB {
+  optional string username = 1;
+};
+
+message AuthzTokenPB {
+};
+
+message TokenPB {
+  // The time at which this token expires, in seconds since the
+  // unix epoch.
+  optional int64 expire_unix_epoch_seconds = 1;
+
+  enum Feature {
+    // Protobuf doesn't let us define a enum with no values,
+    // so we've got this placeholder in here for now. When we add
+    // the first real feature flag, we can remove this.
+    UNUSED_PLACEHOLDER = 999;
+  };
+
+  // List of incompatible features used by this token. If a feature
+  // is listed in the token and a server verifying/authorizing the token
+  // sees an UNKNOWN value in this list, it should reject the token.
+  //
+  // This allows us to safely add "restrictive" content to tokens
+  // and have a "default deny" policy on servers that may not understand
+  // them.
+  //
+  // We use an int32 here but the values correspond to the 'Feature' enum
+  // above. This is to deal with protobuf's odd handling of unknown enum
+  // values (see KUDU-1850).
+  repeated int32 incompatible_features = 2;
+
+  oneof token {
+    AuthnTokenPB authn = 3;
+    AuthzTokenPB authz = 4;
+  }
+};
+
+message SignedTokenPB {
+  // The actual token data. This is a serialized TokenPB protobuf. However, we use a
+  // 'bytes' field, since protobuf doesn't guarantee that if two implementations serialize
+  // a protobuf, they'll necessary get bytewise identical results, particularly in the
+  // presence of unknown fields.
+  optional bytes token_data = 1;
+
+  // The cryptographic signature of 'token_contents'.
+  optional bytes signature = 2 [ (kudu.REDACT) = true ];
+
+  // The sequence number of the key which produced 'signature'.
+  optional int64 signing_key_seq_num = 3;
+};
+
+// A private key used to sign tokens.
+message TokenSigningPrivateKeyPB {
+  optional int64 key_seq_num = 1;
+
+  // The private key material, in DER format.
+  optional bytes rsa_key_der = 2 [ (kudu.REDACT) = true ];
+
+  // The time at which signatures made by this key should no longer be valid.
+  optional int64 expire_unix_epoch_seconds = 3;
+};
+
+// A public key corresponding to the private key used to sign tokens. Only
+// this part is necessary for token verification.
+message TokenSigningPublicKeyPB {
+  optional int64 key_seq_num = 1;
+
+  // The public key material, in DER format.
+  optional bytes rsa_key_der = 2;
+
+  // The time at which signatures made by this key should no longer be valid.
+  optional int64 expire_unix_epoch_seconds = 3;
+};

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/token_signer.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/token_signer.cc b/be/src/kudu/security/token_signer.cc
new file mode 100644
index 0000000..08c84be
--- /dev/null
+++ b/be/src/kudu/security/token_signer.cc
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/token_signer.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signing_key.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+DEFINE_int32(tsk_num_rsa_bits, 2048,
+             "Number of bits in RSA keys used for token signing.");
+TAG_FLAG(tsk_num_rsa_bits, experimental);
+
+using std::lock_guard;
+using std::map;
+using std::shared_ptr;
+using std::string;
+using std::unique_lock;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace security {
+
+TokenSigner::TokenSigner(int64_t authn_token_validity_seconds,
+                         int64_t key_rotation_seconds,
+                         shared_ptr<TokenVerifier> verifier)
+    : verifier_(verifier ? std::move(verifier)
+                         : std::make_shared<TokenVerifier>()),
+      authn_token_validity_seconds_(authn_token_validity_seconds),
+      key_rotation_seconds_(key_rotation_seconds),
+      // The TSK propagation interval is equal to the rotation interval.
+      key_validity_seconds_(2 * key_rotation_seconds_ + authn_token_validity_seconds_),
+      last_key_seq_num_(-1) {
+  CHECK_GE(key_rotation_seconds_, 0);
+  CHECK_GE(authn_token_validity_seconds_, 0);
+  CHECK(verifier_);
+}
+
+TokenSigner::~TokenSigner() {
+}
+
+Status TokenSigner::ImportKeys(const vector<TokenSigningPrivateKeyPB>& keys) {
+  lock_guard<RWMutex> l(lock_);
+
+  const int64_t now = WallTime_Now();
+  map<int64_t, unique_ptr<TokenSigningPrivateKey>> tsk_by_seq;
+  vector<TokenSigningPublicKeyPB> public_keys_pb;
+  public_keys_pb.reserve(keys.size());
+  for (const auto& key : keys) {
+    // Check the input for consistency.
+    CHECK(key.has_key_seq_num());
+    CHECK(key.has_expire_unix_epoch_seconds());
+    CHECK(key.has_rsa_key_der());
+
+    const int64_t key_seq_num = key.key_seq_num();
+    unique_ptr<TokenSigningPrivateKey> tsk(new TokenSigningPrivateKey(key));
+
+    // Advance the key sequence number, if needed. For the use case when the
+    // history of keys sequence numbers is important, the generated keys are
+    // persisted when TokenSigner is active and then the keys are imported from
+    // the store when TokenSigner is initialized (e.g., on restart). It's
+    // crucial to take into account sequence numbers of all previously persisted
+    // keys even if they have expired at the moment of importing.
+    last_key_seq_num_ = std::max(last_key_seq_num_, key_seq_num);
+    const int64_t key_expire_time = tsk->expire_time();
+    if (key_expire_time <= now) {
+      // Do nothing else with an expired TSK.
+      continue;
+    }
+
+    // Need the public part of the key for the TokenVerifier.
+    {
+      TokenSigningPublicKeyPB public_key_pb;
+      tsk->ExportPublicKeyPB(&public_key_pb);
+      public_keys_pb.emplace_back(std::move(public_key_pb));
+    }
+
+    tsk_by_seq[key_seq_num] = std::move(tsk);
+    if (tsk_by_seq.size() > 2) {
+      tsk_by_seq.erase(tsk_by_seq.begin());
+    }
+  }
+  // Register the public parts of the imported keys with the TokenVerifier.
+  RETURN_NOT_OK(verifier_->ImportKeys(public_keys_pb));
+
+  // Use two most recent keys known so far (in terms of sequence numbers)
+  // for token signing.
+  for (auto& e : tsk_deque_) {
+    const int64_t seq_num = e->key_seq_num();
+    tsk_by_seq[seq_num] = std::move(e);
+  }
+  tsk_deque_.clear();
+  for (auto& e : tsk_by_seq) {
+    tsk_deque_.emplace_back(std::move(e.second));
+  }
+  while (tsk_deque_.size() > 2) {
+    tsk_deque_.pop_front();
+  }
+
+  return Status::OK();
+}
+
+Status TokenSigner::GenerateAuthnToken(string username,
+                                       SignedTokenPB* signed_token) const {
+  if (username.empty()) {
+    return Status::InvalidArgument("no username provided for authn token");
+  }
+  TokenPB token;
+  token.set_expire_unix_epoch_seconds(
+      WallTime_Now() + authn_token_validity_seconds_);
+  AuthnTokenPB* authn = token.mutable_authn();
+  authn->mutable_username()->assign(std::move(username));
+
+  SignedTokenPB ret;
+  if (!token.SerializeToString(ret.mutable_token_data())) {
+    return Status::RuntimeError("could not serialize authn token");
+  }
+
+  RETURN_NOT_OK(SignToken(&ret));
+  signed_token->Swap(&ret);
+  return Status::OK();
+}
+
+Status TokenSigner::SignToken(SignedTokenPB* token) const {
+  CHECK(token);
+  shared_lock<RWMutex> l(lock_);
+  if (tsk_deque_.empty()) {
+    return Status::IllegalState("no token signing key");
+  }
+  const TokenSigningPrivateKey* key = tsk_deque_.front().get();
+  RETURN_NOT_OK_PREPEND(key->Sign(token), "could not sign authn token");
+  return Status::OK();
+}
+
+bool TokenSigner::IsCurrentKeyValid() const {
+  shared_lock<RWMutex> l(lock_);
+  if (tsk_deque_.empty()) {
+    return false;
+  }
+  return (tsk_deque_.front()->expire_time() > WallTime_Now());
+}
+
+Status TokenSigner::CheckNeedKey(unique_ptr<TokenSigningPrivateKey>* tsk) const {
+  CHECK(tsk);
+  const int64_t now = WallTime_Now();
+
+  unique_lock<RWMutex> l(lock_);
+  if (tsk_deque_.empty()) {
+    // No active key: need a new one.
+    const int64_t key_seq_num = last_key_seq_num_ + 1;
+    const int64_t key_expiration = now + key_validity_seconds_;
+    // Generation of cryptographically strong key takes many CPU cycles;
+    // do not want to block other parallel activity.
+    l.unlock();
+    return GenerateSigningKey(key_seq_num, key_expiration, tsk);
+  }
+
+  if (tsk_deque_.size() >= 2) {
+    // It does not make much sense to keep more than two keys in the queue.
+    // It's enough to have just one active key and next key ready to be
+    // activated when it's time to do so.  However, it does not mean the
+    // process of key refreshment is about to stop once there are two keys
+    // in the queue: the TryRotate() method (which should be called periodically
+    // along with CheckNeedKey()/AddKey() pair) will eventually pop the
+    // current key out of the keys queue once the key enters its inactive phase.
+    tsk->reset();
+    return Status::OK();
+  }
+
+  // The currently active key is in the front of the queue.
+  const auto* key = tsk_deque_.front().get();
+
+  // Check if it's time to generate a new token signing key.
+  //
+  //   <-----AAAAA===========>
+  //         ^
+  //        now
+  //
+  const auto key_creation_time = key->expire_time() - key_validity_seconds_;
+  if (key_creation_time + key_rotation_seconds_ <= now) {
+    // It's time to create and start propagating next key.
+    const int64_t key_seq_num = last_key_seq_num_ + 1;
+    const int64_t key_expiration = now + key_validity_seconds_;
+    // Generation of cryptographically strong key takes many CPU cycles:
+    // do not want to block other parallel activity.
+    l.unlock();
+    return GenerateSigningKey(key_seq_num, key_expiration, tsk);
+  }
+
+  // It's not yet time to generate a new key.
+  tsk->reset();
+  return Status::OK();
+}
+
+Status TokenSigner::AddKey(unique_ptr<TokenSigningPrivateKey> tsk) {
+  CHECK(tsk);
+  const int64_t key_seq_num = tsk->key_seq_num();
+  if (tsk->expire_time() <= WallTime_Now()) {
+    return Status::InvalidArgument("key has already expired");
+  }
+
+  lock_guard<RWMutex> l(lock_);
+  if (key_seq_num < last_key_seq_num_ + 1) {
+    // The AddKey() method is designed for adding new keys: that should be done
+    // using CheckNeedKey()/AddKey() sequence. Use the ImportKeys() method
+    // for importing keys in bulk.
+    return Status::InvalidArgument(
+        Substitute("$0: invalid key sequence number, should be at least $1",
+                   key_seq_num, last_key_seq_num_ + 1));
+  }
+  last_key_seq_num_ = std::max(last_key_seq_num_, key_seq_num);
+  // Register the public part of the key in TokenVerifier first.
+  TokenSigningPublicKeyPB public_key_pb;
+  tsk->ExportPublicKeyPB(&public_key_pb);
+  RETURN_NOT_OK(verifier_->ImportKeys({public_key_pb}));
+
+  tsk_deque_.emplace_back(std::move(tsk));
+
+  return Status::OK();
+}
+
+Status TokenSigner::TryRotateKey(bool* has_rotated) {
+  lock_guard<RWMutex> l(lock_);
+  if (has_rotated) {
+    *has_rotated = false;
+  }
+  if (tsk_deque_.size() < 2) {
+    // There isn't next key to rotate to.
+    return Status::OK();
+  }
+
+  const auto* key = tsk_deque_.front().get();
+  // Check if it's time to switch to next key. The key propagation interval
+  // is equal to the key rotation interval.
+  //
+  // current active key   <-----AAAAA===========>
+  //           next key        <-----AAAAA===========>
+  //                                 ^
+  //                                now
+  //
+  const auto key_creation_time = key->expire_time() - key_validity_seconds_;
+  if (key_creation_time + 2 * key_rotation_seconds_ <= WallTime_Now()) {
+    tsk_deque_.pop_front();
+    if (has_rotated) {
+      *has_rotated = true;
+    }
+  }
+  return Status::OK();
+}
+
+Status TokenSigner::GenerateSigningKey(int64_t key_seq_num,
+                                       int64_t key_expiration,
+                                       unique_ptr<TokenSigningPrivateKey>* tsk) {
+  unique_ptr<PrivateKey> key(new PrivateKey());
+  RETURN_NOT_OK_PREPEND(
+      GeneratePrivateKey(FLAGS_tsk_num_rsa_bits, key.get()),
+      "could not generate new RSA token-signing key");
+  tsk->reset(new TokenSigningPrivateKey(key_seq_num,
+                                        key_expiration,
+                                        std::move(key)));
+  return Status::OK();
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/token_signer.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/token_signer.h b/be/src/kudu/security/token_signer.h
new file mode 100644
index 0000000..df1e3eb
--- /dev/null
+++ b/be/src/kudu/security/token_signer.h
@@ -0,0 +1,316 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/rw_mutex.h"
+
+namespace kudu {
+class Status;
+
+namespace security {
+class SignedTokenPB;
+class TokenSigningPrivateKey;
+class TokenSigningPrivateKeyPB;
+class TokenVerifier;
+
+// Class responsible for managing Token Signing Keys (TSKs) and signing tokens.
+//
+// This class manages a set of private TSKs, each identified by a sequence
+// number. Callers can export their public TSK counterparts via the included
+// TokenVerifier, optionally transfer them to another node, and then import
+// them into a TokenVerifier.
+//
+// The class provides the ability to check whether it's time go generate and
+// activate a new key. Every generated private/public key pair is assigned a
+// sequence number. Note that, when signing tokens, the most recent key
+// (a.k.a. next key) is not used. Rather, the second-most-recent key, if exists,
+// is used. This ensures that there is plenty of time to transmit the public
+// part of the new TSK to all TokenVerifiers (e.g. on other servers via
+// heartbeats or by other means), before the new key enters usage.
+//
+// On a fresh instance, with only one key, there is no "second most recent"
+// key. Thus, we fall back to signing tokens with the only available key.
+//
+// Key rotation schedules and validity periods
+// ===========================================
+// The TokenSigner does not automatically handle the rotation of keys.
+// Rotation must be performed by an external caller using the combination of
+// 'CheckNeedKey()/AddKey()' and 'TryRotateKey()' methods. Typically,
+// key rotation is performed more frequently than the validity period
+// of the key, so that at any given point in time there are several valid keys.
+//
+// Below is the life cycle of a TSK (token signing key):
+//
+//      <---AAAAA===============>
+//      ^                       ^
+// creation time          expiration time
+//
+// Prior to the creation time the TSK does not exist in the system.
+//
+// '-' propagation interval
+//       The TSK is already created but not yet used to sign tokens. However,
+//       its public part is already being sent to the components which
+//       may be involved in validation of tokens signed by the key.
+//
+// 'A' activity interval
+//       The TSK is used to sign tokens. It's assumed that the components which
+//       are involved in token verification have already received
+//       the corresponding public part of the TSK.
+//
+// '=' inactivity interval
+//       The TSK is no longer used to sign tokens. However, it's still sent
+//       to other components which validate token signatures.
+//
+// Shortly after the TSK's expiration the token signing components stop
+// propagating its public part.
+//
+// The TSK is considered valid from its creation time until its expiration time.
+//
+// NOTE: The very first key created on the system bootstrap does not have
+//       propagation interval -- it turns active immediately.
+//
+// NOTE: One other result of the above is that the first key (Key 1) is actually
+//       active for longer than the rest. This has some potential security
+//       implications, so it's worth considering rolling twice at startup.
+//
+// For example, consider the following configuration for token signing keys:
+//   validity period:      4 days
+//   rotation interval:    1 days
+//   propagation interval: 1 day
+//
+// Day      1    2    3    4    5    6    7    8
+// ------------------------------------------------
+// Key 1:   <AAAAAAAAA==========>
+// Key 2:        <----AAAAA==========>
+// Key 3:             <----AAAAA==========>
+// Key 4:                  <----AAAAA==========>
+//                              ...............
+// authn token:                     <**********>
+//
+// 'A' indicates the 'Originator Usage Period' (a.k.a. 'Activity Interval'),
+// i.e. the period in which the key is being used to sign tokens.
+//
+// '<...>' indicates the 'Recipient Usage Period': the period in which
+// the verifier may get tokens signed by the TSK and should consider them
+// for verification. The start of the recipient usage period is not crucial
+// in that regard, but the end of that period is -- after the TSK is expired,
+// a verifier should consider tokens signed by that TSK invalid and stop
+// accepting them even if the token signature is correct and the expiration.
+//
+// '<***>' indicates the validity interval for an authn token.
+//
+// When configuring key rotation and authn token validity interval durations,
+// consider the following constraint:
+//
+//   max_token_validity < tsk_validity_period -
+//       (tsk_propagation_interval + tsk_rotation_interval)
+//
+// The idea is that the token validity interval should be contained in the
+// corresponding TSK's validity interval. If the TSK is already expired at the
+// time of token verification, the token is considered invalid and the
+// verification of the token fails. This means that no token may be issued with
+// a validity period longer than or equal to TSK inactivity interval, without
+// risking that the signing/verification key would expire before the token
+// itself. The edge case is demonstrated by the following scenario:
+//
+// * A TSK is issued at 00:00:00 on day 4.
+// * An authn token generated and signed by current/active TSK at 23:59:59 on
+//   day 6. That's at the very end of the TSK's activity interval.
+// * From the diagram above it's clear that if the authn token validity
+//   interval were set to something longer than TSK inactivity interval
+//   (which is 2 days with for the specified parameters), an attempt to verify
+//   the token at 00:00:00 on day 8 or later would fail due to the expiration
+//   the corresponding TSK.
+//
+// NOTE: Current implementation of TokenSigner assumes the propagation
+//       interval is equal to the rotation interval.
+//
+// Typical usage pattern:
+//
+//    TokenSigner ts(...);
+//    // Load existing TSKs from the system table.
+//    ...
+//    RETURN_NOT_OK(ts.ImportKeys(...));
+//
+//    // Check that there is a valid TSK to sign keys.
+//    {
+//      unique_ptr<TokenSigningPrivateKey> key;
+//      RETURN_NOT_OK(ts.CheckNeedKey(&key));
+//      if (key) {
+//        // Store the newly generated key into the system table.
+//        ...
+//
+//        // Add the key into the queue of the TokenSigner.
+//        RETURN_NOT_OK(ts.AddKey(std::move(key)));
+//      }
+//    }
+//    // Check and switch to the next key, if it's time.
+//    RETURN_NOT_OK(ts.TryRotateKey());
+//
+//    ...
+//    // Time to time (but much more often than TSK validity/rotation interval)
+//    // call the 'CheckNeedKey()/AddKey() followed by TryRotateKey()' sequence.
+//    // It's a good idea to dedicate a separate periodic task for that.
+//    ...
+//
+class TokenSigner {
+ public:
+  // The 'key_validity_seconds' and 'key_rotation_seconds' parameters define
+  // the schedule of TSK rotation. See the class comment above for details.
+  //
+  // Any newly imported or generated keys are automatically imported into the
+  // passed 'verifier'. If no verifier passed as a parameter, TokenSigner
+  // creates one on its own. In either case, it's possible to access
+  // the embedded TokenVerifier instance using the verifier() accessor.
+  //
+  // The 'authn_token_validity_seconds' parameter is used to specify validity
+  // interval for the generated authn tokens and with 'key_rotation_seconds'
+  // it defines validity interval of the newly generated TSK:
+  //   key_validity = 2 * key_rotation + authn_token_validity.
+  //
+  // That corresponds to the maximum possible token lifetime for the effective
+  // TSK validity and rotation intervals: see the class comment above for
+  // details.
+  TokenSigner(int64_t authn_token_validity_seconds,
+              int64_t key_rotation_seconds,
+              std::shared_ptr<TokenVerifier> verifier = nullptr);
+  ~TokenSigner();
+
+  // Import token signing keys in PB format, notifying TokenVerifier
+  // and updating internal key sequence number. This method can be called
+  // multiple times. Depending on the input keys and current time, the instance
+  // might not be ready to sign keys right after calling ImportKeys(),
+  // so additional cycle of CheckNeedKey/AddKey might be needed.
+  //
+  // See the class comment above for more information about the intended usage.
+  Status ImportKeys(const std::vector<TokenSigningPrivateKeyPB>& keys)
+      WARN_UNUSED_RESULT;
+
+  // Check whether it's time to generate and add a new key. If so, the new key
+  // is generated and output into the 'tsk' parameter so it's possible to
+  // examine and process the key as needed (e.g. store it). After that, use the
+  // AddKey() method to actually add the key into the TokenSigner's key queue.
+  //
+  // Every non-null key returned by this method has key sequence number.
+  // It's not a problem to call this method multiple times but call the AddKey()
+  // method only once, effectively discarding all the generated keys except for
+  // the key passed to the AddKey() call as a parameter. The key sequence number
+  // always increments with every newly added key (i.e. every successful call of
+  // the AddKey() method). The result key number sequence would not contain
+  // any 'holes'.
+  //
+  // In other words, sequence of calls like
+  //
+  //   CheckNeedKey(k);
+  //   CheckNeedKey(k);
+  //   ...
+  //   CheckNeedKey(k);
+  //   AddKey(k);
+  //
+  // would increase the key sequence number just by 1. Due to that fact, the
+  // following sequence of calls to CheckNeedKey()/AddKey() would work fine:
+  //
+  //   CheckNeedKey(k0);
+  //   AddKey(k0);
+  //   CheckNeedKey(k1);
+  //   AddKey(k1);
+  //
+  // but the sequence below would fail at AddKey(k1):
+  //
+  //   CheckNeedKey(k0);
+  //   CheckNeedKey(k1);
+  //   AddKey(k0);
+  //   AddKey(k1);
+  //
+  // See the class comment above for more information about the intended usage.
+  Status CheckNeedKey(std::unique_ptr<TokenSigningPrivateKey>* tsk) const
+      WARN_UNUSED_RESULT;
+
+  // Add the new key into the token signing keys queue. Call TryRotateKey()
+  // to make the newly added key active when it's time.
+  //
+  // See the class comment above for more information about the intended usage.
+  Status AddKey(std::unique_ptr<TokenSigningPrivateKey> tsk) WARN_UNUSED_RESULT;
+
+  // Check whether it's possible and it's time to switch to next signing key
+  // from the token signing keys queue. A key can be added using the
+  // CheckNeedKey()/AddKey() method pair. If there is next key to switch to
+  // and it's time to do so, the methods switches to the next key and reports
+  // on that via the 'has_rotated' parameter.
+  // The intended use case is to call TryRotateKey() periodically.
+  //
+  // See the class comment above for more information about the intended usage.
+  Status TryRotateKey(bool* has_rotated = nullptr) WARN_UNUSED_RESULT;
+
+  Status GenerateAuthnToken(std::string username,
+                            SignedTokenPB* signed_token) const WARN_UNUSED_RESULT;
+
+  Status SignToken(SignedTokenPB* token) const WARN_UNUSED_RESULT;
+
+  const TokenVerifier& verifier() const { return *verifier_; }
+
+  // Check if the current TSK is valid: return 'true' if current key is present
+  // and it's not yet expired, return 'false' otherwise.
+  bool IsCurrentKeyValid() const;
+
+ private:
+  FRIEND_TEST(TokenTest, TestEndToEnd_InvalidCases);
+
+  static Status GenerateSigningKey(int64_t key_seq_num,
+                                   int64_t key_expiration,
+                                   std::unique_ptr<TokenSigningPrivateKey>* tsk) WARN_UNUSED_RESULT;
+
+  std::shared_ptr<TokenVerifier> verifier_;
+
+  // Validity interval for the generated authn tokens.
+  const int64_t authn_token_validity_seconds_;
+
+  // TSK rotation interval: number of seconds between consecutive activations
+  // of new token signing keys. Note that in current implementation it defines
+  // the propagation interval as well, i.e. the TSK propagation interval is
+  // equal to the TSK rotation interval.
+  const int64_t key_rotation_seconds_;
+
+  // Period of validity for newly created token signing keys. In other words,
+  // the expiration time for a new key is set to (now + key_validity_seconds_).
+  const int64_t key_validity_seconds_;
+
+  // Protects next_seq_num_ and tsk_deque_ members.
+  mutable RWMutex lock_;
+
+  // The sequence number of the last generated/imported key.
+  int64_t last_key_seq_num_;
+
+  // The currently active key is in the front of the queue,
+  // the newly added ones are pushed into back of the queue.
+  std::deque<std::unique_ptr<TokenSigningPrivateKey>> tsk_deque_;
+
+  DISALLOW_COPY_AND_ASSIGN(TokenSigner);
+};
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/token_signing_key.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/token_signing_key.cc b/be/src/kudu/security/token_signing_key.cc
new file mode 100644
index 0000000..38d49c6
--- /dev/null
+++ b/be/src/kudu/security/token_signing_key.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/token_signing_key.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/status.h"
+
+using std::unique_ptr;
+using std::string;
+
+namespace kudu {
+namespace security {
+
+TokenSigningPublicKey::TokenSigningPublicKey(TokenSigningPublicKeyPB pb)
+    : pb_(std::move(pb)) {
+}
+
+TokenSigningPublicKey::~TokenSigningPublicKey() {
+}
+
+Status TokenSigningPublicKey::Init() {
+  // This should be called only once.
+  CHECK(!key_.GetRawData());
+  if (!pb_.has_rsa_key_der()) {
+    return Status::RuntimeError("no key for token signing helper");
+  }
+  RETURN_NOT_OK(key_.FromString(pb_.rsa_key_der(), DataFormat::DER));
+  return Status::OK();
+}
+
+bool TokenSigningPublicKey::VerifySignature(const SignedTokenPB& token) const {
+  return key_.VerifySignature(DigestType::SHA256,
+      token.token_data(), token.signature()).ok();
+}
+
+TokenSigningPrivateKey::TokenSigningPrivateKey(
+    const TokenSigningPrivateKeyPB& pb)
+    : key_(new PrivateKey) {
+  CHECK_OK(key_->FromString(pb.rsa_key_der(), DataFormat::DER));
+  private_key_der_ = pb.rsa_key_der();
+  key_seq_num_ = pb.key_seq_num();
+  expire_time_ = pb.expire_unix_epoch_seconds();
+
+  PublicKey public_key;
+  CHECK_OK(key_->GetPublicKey(&public_key));
+  CHECK_OK(public_key.ToString(&public_key_der_, DataFormat::DER));
+}
+
+TokenSigningPrivateKey::TokenSigningPrivateKey(
+    int64_t key_seq_num, int64_t expire_time, unique_ptr<PrivateKey> key)
+    : key_(std::move(key)),
+      key_seq_num_(key_seq_num),
+      expire_time_(expire_time) {
+  CHECK_OK(key_->ToString(&private_key_der_, DataFormat::DER));
+  PublicKey public_key;
+  CHECK_OK(key_->GetPublicKey(&public_key));
+  CHECK_OK(public_key.ToString(&public_key_der_, DataFormat::DER));
+}
+
+TokenSigningPrivateKey::~TokenSigningPrivateKey() {
+}
+
+Status TokenSigningPrivateKey::Sign(SignedTokenPB* token) const {
+  string signature;
+  RETURN_NOT_OK(key_->MakeSignature(DigestType::SHA256,
+      token->token_data(), &signature));
+  token->mutable_signature()->assign(std::move(signature));
+  token->set_signing_key_seq_num(key_seq_num_);
+  return Status::OK();
+}
+
+void TokenSigningPrivateKey::ExportPB(TokenSigningPrivateKeyPB* pb) const {
+  pb->Clear();
+  pb->set_key_seq_num(key_seq_num_);
+  pb->set_rsa_key_der(private_key_der_);
+  pb->set_expire_unix_epoch_seconds(expire_time_);
+}
+
+void TokenSigningPrivateKey::ExportPublicKeyPB(TokenSigningPublicKeyPB* pb) const {
+  pb->Clear();
+  pb->set_key_seq_num(key_seq_num_);
+  pb->set_rsa_key_der(public_key_der_);
+  pb->set_expire_unix_epoch_seconds(expire_time_);
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/token_signing_key.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/token_signing_key.h b/be/src/kudu/security/token_signing_key.h
new file mode 100644
index 0000000..67ecf95
--- /dev/null
+++ b/be/src/kudu/security/token_signing_key.h
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace security {
+
+// Wrapper around a TokenSigningPublicKeyPB that provides useful functionality
+// to verify tokens.
+//
+// This represents a standalone public key useful for token verification.
+class TokenSigningPublicKey {
+ public:
+  explicit TokenSigningPublicKey(TokenSigningPublicKeyPB pb);
+  ~TokenSigningPublicKey();
+
+  const TokenSigningPublicKeyPB& pb() const {
+    return pb_;
+  }
+
+  // Initialize the object. Should be called only once.
+  Status Init() WARN_UNUSED_RESULT;
+
+  // Verify the signature in a given token.
+  // This method is thread-safe.
+  // NOTE: this does _not_ verify the expiration.
+  bool VerifySignature(const SignedTokenPB& token) const;
+
+ private:
+  const TokenSigningPublicKeyPB pb_;
+  // The 'key_' member is a parsed version of rsa_key_der() from pb_.
+  // In essence, the 'key_' is a public key for message signature verification.
+  PublicKey key_;
+
+  DISALLOW_COPY_AND_ASSIGN(TokenSigningPublicKey);
+};
+
+// Contains a private key used to sign tokens, along with its sequence
+// number and expiration date.
+class TokenSigningPrivateKey {
+ public:
+  explicit TokenSigningPrivateKey(const TokenSigningPrivateKeyPB& pb);
+  TokenSigningPrivateKey(int64_t key_seq_num,
+                         int64_t expire_time,
+                         std::unique_ptr<PrivateKey> key);
+  ~TokenSigningPrivateKey();
+
+  // Sign a token, and store the signature and signing key's sequence number.
+  Status Sign(SignedTokenPB* token) const WARN_UNUSED_RESULT;
+
+  // Export data into corresponding PB structure.
+  void ExportPB(TokenSigningPrivateKeyPB* pb) const;
+
+  // Export the public-key portion of this signing key.
+  void ExportPublicKeyPB(TokenSigningPublicKeyPB* pb) const;
+
+  int64_t key_seq_num() const { return key_seq_num_; }
+  int64_t expire_time() const { return expire_time_; }
+
+ private:
+  FRIEND_TEST(TokenTest, TestAddKeyConstraints);
+
+  std::unique_ptr<PrivateKey> key_;
+  // The 'private_key_der_' is a serialized 'key_' in DER format: just a cache.
+  std::string private_key_der_;
+  // The 'public_key_der_' is serialized public part of 'key_' in DER format;
+  // just a cache.
+  std::string public_key_der_;
+
+  int64_t key_seq_num_;
+  int64_t expire_time_;
+
+  DISALLOW_COPY_AND_ASSIGN(TokenSigningPrivateKey);
+};
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/token_verifier.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/token_verifier.cc b/be/src/kudu/security/token_verifier.cc
new file mode 100644
index 0000000..1ae20db
--- /dev/null
+++ b/be/src/kudu/security/token_verifier.cc
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/token_verifier.h"
+
+#include <algorithm>
+#include <iterator>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/security/token_signing_key.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/status.h"
+
+using std::lock_guard;
+using std::string;
+using std::transform;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace security {
+
+TokenVerifier::TokenVerifier() {
+}
+
+TokenVerifier::~TokenVerifier() {
+}
+
+int64_t TokenVerifier::GetMaxKnownKeySequenceNumber() const {
+  shared_lock<RWMutex> l(lock_);
+  if (keys_by_seq_.empty()) {
+    return -1;
+  }
+
+  return keys_by_seq_.rbegin()->first;
+}
+
+// Import a set of public keys provided by the token signer (typically
+// running on another node).
+Status TokenVerifier::ImportKeys(const vector<TokenSigningPublicKeyPB>& keys) {
+  // Do the construction outside of the lock, to avoid holding the
+  // lock while doing lots of allocation.
+  vector<unique_ptr<TokenSigningPublicKey>> tsks;
+  for (const auto& pb : keys) {
+    // Sanity check the key.
+    if (!pb.has_rsa_key_der()) {
+      return Status::RuntimeError(
+          "token-signing public key message must include the signing key");
+    }
+    if (!pb.has_key_seq_num()) {
+      return Status::RuntimeError(
+          "token-signing public key message must include the signing key sequence number");
+    }
+    if (!pb.has_expire_unix_epoch_seconds()) {
+      return Status::RuntimeError(
+          "token-signing public key message must include an expiration time");
+    }
+    tsks.emplace_back(new TokenSigningPublicKey { pb });
+    RETURN_NOT_OK(tsks.back()->Init());
+  }
+
+  lock_guard<RWMutex> l(lock_);
+  for (auto&& tsk_ptr : tsks) {
+    keys_by_seq_.emplace(tsk_ptr->pb().key_seq_num(), std::move(tsk_ptr));
+  }
+  return Status::OK();
+}
+
+std::vector<TokenSigningPublicKeyPB> TokenVerifier::ExportKeys(
+    int64_t after_sequence_number) const {
+  vector<TokenSigningPublicKeyPB> ret;
+  shared_lock<RWMutex> l(lock_);
+  ret.reserve(keys_by_seq_.size());
+  transform(keys_by_seq_.upper_bound(after_sequence_number),
+            keys_by_seq_.end(),
+            back_inserter(ret),
+            [](const KeysMap::value_type& e) { return e.second->pb(); });
+  return ret;
+}
+
+// Verify the signature on the given token.
+VerificationResult TokenVerifier::VerifyTokenSignature(const SignedTokenPB& signed_token,
+                                                       TokenPB* token) const {
+  if (!signed_token.has_signature() ||
+      !signed_token.has_signing_key_seq_num() ||
+      !signed_token.has_token_data()) {
+    return VerificationResult::INVALID_TOKEN;
+  }
+
+  if (!token->ParseFromString(signed_token.token_data()) ||
+      !token->has_expire_unix_epoch_seconds()) {
+    return VerificationResult::INVALID_TOKEN;
+  }
+
+  int64_t now = WallTime_Now();
+  if (token->expire_unix_epoch_seconds() < now) {
+    return VerificationResult::EXPIRED_TOKEN;
+  }
+
+  for (auto flag : token->incompatible_features()) {
+    if (!TokenPB::Feature_IsValid(flag)) {
+      KLOG_EVERY_N_SECS(WARNING, 60) << "received authentication token with unknown feature; "
+                                        "server needs to be updated";
+      return VerificationResult::INCOMPATIBLE_FEATURE;
+    }
+  }
+
+  {
+    shared_lock<RWMutex> l(lock_);
+    auto* tsk = FindPointeeOrNull(keys_by_seq_, signed_token.signing_key_seq_num());
+    if (!tsk) {
+      return VerificationResult::UNKNOWN_SIGNING_KEY;
+    }
+    if (tsk->pb().expire_unix_epoch_seconds() < now) {
+      return VerificationResult::EXPIRED_SIGNING_KEY;
+    }
+    if (!tsk->VerifySignature(signed_token)) {
+      return VerificationResult::INVALID_SIGNATURE;
+    }
+  }
+
+  return VerificationResult::VALID;
+}
+
+const char* VerificationResultToString(VerificationResult r) {
+  switch (r) {
+    case security::VerificationResult::VALID:
+      return "valid";
+    case security::VerificationResult::INVALID_TOKEN:
+      return "invalid authentication token";
+    case security::VerificationResult::INVALID_SIGNATURE:
+      return "invalid authentication token signature";
+    case security::VerificationResult::EXPIRED_TOKEN:
+      return "authentication token expired";
+    case security::VerificationResult::EXPIRED_SIGNING_KEY:
+      return "authentication token signing key expired";
+    case security::VerificationResult::UNKNOWN_SIGNING_KEY:
+      return "authentication token signed with unknown key";
+    case security::VerificationResult::INCOMPATIBLE_FEATURE:
+      return "authentication token uses incompatible feature";
+    default:
+      LOG(FATAL) << "unexpected VerificationResult value: "
+                 << static_cast<int>(r);
+  }
+}
+
+} // namespace security
+} // namespace kudu
+

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/token_verifier.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/token_verifier.h b/be/src/kudu/security/token_verifier.h
new file mode 100644
index 0000000..8d5d176
--- /dev/null
+++ b/be/src/kudu/security/token_verifier.h
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/rw_mutex.h"
+
+namespace kudu {
+
+class Status;
+
+namespace security {
+
+class SignedTokenPB;
+class TokenPB;
+class TokenSigningPublicKey;
+class TokenSigningPublicKeyPB;
+enum class VerificationResult;
+
+// Class responsible for verifying tokens provided to a server.
+//
+// This class manages a set of public keys, each identified by a sequence
+// number. It exposes the latest known sequence number, which can be sent
+// to a 'TokenSigner' running on another node. That node can then
+// export public keys, which are transferred back to this node and imported
+// into the 'TokenVerifier'.
+//
+// Each signed token also includes the key sequence number that signed it,
+// so this class can look up the correct key and verify the token's
+// validity and expiration.
+//
+// Note that this class does not perform any "business logic" around the
+// content of a token. It only verifies that the token has a valid signature
+// and is not yet expired. Any business rules around authorization or
+// authentication are left up to callers.
+//
+// NOTE: old tokens are never removed from the underlying storage of this
+// class. The assumption is that tokens rotate so infreqeuently that this
+// slow leak is not worrisome. If this class is adopted for any use cases
+// with frequent rotation, GC of expired tokens will need to be added.
+//
+// This class is thread-safe.
+class TokenVerifier {
+ public:
+  TokenVerifier();
+  ~TokenVerifier();
+
+  // Return the highest key sequence number known by this instance.
+  //
+  // If no keys are known, return -1.
+  int64_t GetMaxKnownKeySequenceNumber() const;
+
+  // Import a set of public keys provided by a TokenSigner instance
+  // (which might be running on a remote node). If any public keys already
+  // exist with matching key sequence numbers, they are replaced by
+  // the new keys.
+  Status ImportKeys(const std::vector<TokenSigningPublicKeyPB>& keys) WARN_UNUSED_RESULT;
+
+  // Export token signing public keys. Specifying the 'after_sequence_number'
+  // allows to get public keys with sequence numbers greater than
+  // 'after_sequence_number'. If the 'after_sequence_number' parameter is
+  // omitted, all known public keys are exported.
+  std::vector<TokenSigningPublicKeyPB> ExportKeys(
+      int64_t after_sequence_number = -1) const;
+
+  // Verify the signature on the given signed token, and deserialize the
+  // contents into 'token'.
+  VerificationResult VerifyTokenSignature(const SignedTokenPB& signed_token,
+                                          TokenPB* token) const;
+
+ private:
+  typedef std::map<int64_t, std::unique_ptr<TokenSigningPublicKey>> KeysMap;
+
+  // Lock protecting keys_by_seq_
+  mutable RWMutex lock_;
+  KeysMap keys_by_seq_;
+
+  DISALLOW_COPY_AND_ASSIGN(TokenVerifier);
+};
+
+// Result of a token verification.
+// Values added to this enum must also be added to VerificationResultToString().
+enum class VerificationResult {
+  // The signature is valid and the token is not expired.
+  VALID,
+  // The token itself is invalid (e.g. missing its signature or data,
+  // can't be deserialized, etc).
+  INVALID_TOKEN,
+  // The signature is invalid (i.e. cryptographically incorrect).
+  INVALID_SIGNATURE,
+  // The signature is valid, but the token has already expired.
+  EXPIRED_TOKEN,
+  // The signature is valid, but the signing key is no longer valid.
+  EXPIRED_SIGNING_KEY,
+  // The signing key used to sign this token is not available.
+  UNKNOWN_SIGNING_KEY,
+  // The token uses an incompatible feature which isn't supported by this
+  // version of the server. We reject the token to give a "default deny"
+  // policy.
+  INCOMPATIBLE_FEATURE
+};
+
+const char* VerificationResultToString(VerificationResult r);
+
+} // namespace security
+} // namespace kudu