You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:47 UTC

[35/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/service_queue.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_queue.h b/be/src/kudu/rpc/service_queue.h
new file mode 100644
index 0000000..2751a30
--- /dev/null
+++ b/be/src/kudu/rpc/service_queue.h
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SERVICE_QUEUE_H
+#define KUDU_UTIL_SERVICE_QUEUE_H
+
+#include <memory>
+#include <string>
+#include <set>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+
+namespace boost {
+template <class T>
+class optional;
+}
+
+namespace kudu {
+namespace rpc {
+
+// Return values for ServiceQueue::Put()
+enum QueueStatus {
+  QUEUE_SUCCESS = 0,
+  QUEUE_SHUTDOWN = 1,
+  QUEUE_FULL = 2
+};
+
+// Blocking queue used for passing inbound RPC calls to the service handler pool.
+// Calls are dequeued in 'earliest-deadline first' order. The queue also maintains a
+// bounded number of calls. If the queue overflows, then calls with deadlines farthest
+// in the future are evicted.
+//
+// When calls do not provide deadlines, the RPC layer considers their deadline to
+// be infinitely in the future. This means that any call that does have a deadline
+// can evict any call that does not have a deadline. This incentivizes clients to
+// provide accurate deadlines for their calls.
+//
+// In order to improve concurrent throughput, this class uses a LIFO design:
+// Each consumer thread has its own lock and condition variable. If a
+// consumer arrives and there is no work available in the queue, it will not
+// wait on the queue lock, but rather push its own 'ConsumerState' object
+// to the 'waiting_consumers_' stack. When work arrives, if there are waiting
+// consumers, the top consumer is popped from the stack and woken up.
+//
+// This design has a few advantages over the basic BlockingQueue:
+// - the worker who was most recently busy is the one which will be selected for
+//   new work. This gives an opportunity for the worker to be scheduled again
+//   without going to sleep, and also keeps CPU cache and allocator caches hot.
+// - in the common case that there are enough workers to fully service the incoming
+//   work rate, the queue implementation itself is never used. Thus, we can
+//   have a priority queue without paying extra for it in the common case.
+//
+// NOTE: because of the use of thread-local consumer records, once a consumer
+// thread accesses one LifoServiceQueue, it becomes "bound" to that queue and
+// must never access any other instance.
+class LifoServiceQueue {
+ public:
+  explicit LifoServiceQueue(int max_size);
+
+  ~LifoServiceQueue();
+
+  // Get an element from the queue.  Returns false if we were shut down prior to
+  // getting the element.
+  bool BlockingGet(std::unique_ptr<InboundCall>* out);
+
+  // Add a new call to the queue.
+  // Returns:
+  // - QUEUE_SHUTDOWN if Shutdown() has already been called.
+  // - QUEUE_FULL if the queue is full and 'call' has a later deadline than any
+  //   RPC already in the queue.
+  // - QUEUE_SUCCESS if 'call' was enqueued.
+  //
+  // In the case of a 'QUEUE_SUCCESS' response, the new element may have bumped
+  // another call out of the queue. In that case, *evicted will be set to the
+  // call that was bumped.
+  QueueStatus Put(InboundCall* call, boost::optional<InboundCall*>* evicted);
+
+  // Shut down the queue.
+  // When a blocking queue is shut down, no more elements can be added to it,
+  // and Put() will return QUEUE_SHUTDOWN.
+  // Existing elements will drain out of it, and then BlockingGet will start
+  // returning false.
+  void Shutdown();
+
+  bool empty() const;
+
+  int max_size() const;
+
+  std::string ToString() const;
+
+  // Return an estimate of the current queue length.
+  int estimated_queue_length() const {
+    ANNOTATE_IGNORE_READS_BEGIN();
+    // The C++ standard says that std::multiset::size must be constant time,
+    // so this method won't try to traverse any actual nodes of the underlying
+    // RB tree. Investigation of the libstdcxx implementation confirms that
+    // size() is a simple field access of the _Rb_tree structure.
+    int ret = queue_.size();
+    ANNOTATE_IGNORE_READS_END();
+    return ret;
+  }
+
+  // Return an estimate of the number of idle threads currently awaiting work.
+  int estimated_idle_worker_count() const {
+    ANNOTATE_IGNORE_READS_BEGIN();
+    // Size of a vector is a simple field access so this is safe.
+    int ret = waiting_consumers_.size();
+    ANNOTATE_IGNORE_READS_END();
+    return ret;
+  }
+
+ private:
+  // Comparison function which orders calls by their deadlines.
+  static bool DeadlineLess(const InboundCall* a,
+                           const InboundCall* b) {
+    auto time_a = a->GetClientDeadline();
+    auto time_b = b->GetClientDeadline();
+    if (time_a == time_b) {
+      // If two calls have the same deadline (most likely because neither one specified
+      // one) then we should order them by arrival order.
+      time_a = a->GetTimeReceived();
+      time_b = b->GetTimeReceived();
+    }
+    return time_a < time_b;
+  }
+
+  // Struct functor wrapper for DeadlineLess.
+  struct DeadlineLessStruct {
+    bool operator()(const InboundCall* a, const InboundCall* b) const {
+      return DeadlineLess(a, b);
+    }
+  };
+
+  // The thread-local record corresponding to a single consumer thread.
+  // Threads push this record onto the waiting_consumers_ stack when
+  // they are awaiting work. Producers pop the top waiting consumer and
+  // post work using Post().
+  class ConsumerState {
+   public:
+    explicit ConsumerState(LifoServiceQueue* queue) :
+        cond_(&lock_),
+        call_(nullptr),
+        should_wake_(false),
+        bound_queue_(queue) {
+    }
+
+    void Post(InboundCall* call) {
+      DCHECK(call_ == nullptr);
+      MutexLock l(lock_);
+      call_ = call;
+      should_wake_ = true;
+      cond_.Signal();
+    }
+
+    InboundCall* Wait() {
+      MutexLock l(lock_);
+      while (should_wake_ == false) {
+        cond_.Wait();
+      }
+      should_wake_ = false;
+      InboundCall* ret = call_;
+      call_ = nullptr;
+      return ret;
+    }
+
+    void DCheckBoundInstance(LifoServiceQueue* q) {
+      DCHECK_EQ(q, bound_queue_);
+    }
+
+   private:
+    Mutex lock_;
+    ConditionVariable cond_;
+    InboundCall* call_;
+    bool should_wake_;
+
+    // For the purpose of assertions, tracks the LifoServiceQueue instance that
+    // this consumer is reading from.
+    LifoServiceQueue* bound_queue_;
+  };
+
+  static __thread ConsumerState* tl_consumer_;
+
+  mutable simple_spinlock lock_;
+  bool shutdown_;
+  int max_queue_size_;
+
+  // Stack of consumer threads which are currently waiting for work.
+  std::vector<ConsumerState*> waiting_consumers_;
+
+  // The actual queue. Work is only added to the queue when there were no
+  // consumers available for a "direct hand-off".
+  std::multiset<InboundCall*, DeadlineLessStruct> queue_;
+
+  // The total set of consumers who have ever accessed this queue.
+  std::vector<std::unique_ptr<ConsumerState>> consumers_;
+
+  DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc
new file mode 100644
index 0000000..bdf5191
--- /dev/null
+++ b/be/src/kudu/rpc/transfer.cc
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/transfer.h"
+
+#include <sys/uio.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <iostream>
+#include <limits>
+#include <set>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/net/socket.h"
+
+DEFINE_int64(rpc_max_message_size, (50 * 1024 * 1024),
+             "The maximum size of a message that any RPC that the server will accept. "
+             "Must be at least 1MB.");
+TAG_FLAG(rpc_max_message_size, advanced);
+TAG_FLAG(rpc_max_message_size, runtime);
+
+static bool ValidateMaxMessageSize(const char* flagname, int64_t value) {
+  if (value < 1 * 1024 * 1024) {
+    LOG(ERROR) << flagname << " must be at least 1MB.";
+    return false;
+  }
+  if (value > std::numeric_limits<int32_t>::max()) {
+    LOG(ERROR) << flagname << " must be less than "
+               << std::numeric_limits<int32_t>::max() << " bytes.";
+  }
+
+  return true;
+}
+static bool dummy = google::RegisterFlagValidator(
+    &FLAGS_rpc_max_message_size, &ValidateMaxMessageSize);
+
+namespace kudu {
+namespace rpc {
+
+using std::ostringstream;
+using std::set;
+using std::string;
+using strings::Substitute;
+
+#define RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status)               \
+  do {                                                            \
+    Status _s = (status);                                         \
+    if (PREDICT_FALSE(!_s.ok())) {                                \
+      if (Socket::IsTemporarySocketError(_s.posix_code())) {      \
+        return Status::OK(); /* EAGAIN, etc. */                   \
+      }                                                           \
+      return _s;                                                  \
+    }                                                             \
+  } while (0)
+
+TransferCallbacks::~TransferCallbacks()
+{}
+
+InboundTransfer::InboundTransfer()
+  : total_length_(kMsgLengthPrefixLength),
+    cur_offset_(0) {
+  buf_.resize(kMsgLengthPrefixLength);
+}
+
+Status InboundTransfer::ReceiveBuffer(Socket &socket) {
+  if (cur_offset_ < kMsgLengthPrefixLength) {
+    // receive uint32 length prefix
+    int32_t rem = kMsgLengthPrefixLength - cur_offset_;
+    int32_t nread;
+    Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+    RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+    if (nread == 0) {
+      return Status::OK();
+    }
+    DCHECK_GE(nread, 0);
+    cur_offset_ += nread;
+    if (cur_offset_ < kMsgLengthPrefixLength) {
+      // If we still don't have the full length prefix, we can't continue
+      // reading yet.
+      return Status::OK();
+    }
+    // Since we only read 'rem' bytes above, we should now have exactly
+    // the length prefix in our buffer and no more.
+    DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength);
+
+    // The length prefix doesn't include its own 4 bytes, so we have to
+    // add that back in.
+    total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength;
+    if (total_length_ > FLAGS_rpc_max_message_size) {
+      return Status::NetworkError(Substitute(
+          "RPC frame had a length of $0, but we only support messages up to $1 bytes "
+          "long.", total_length_, FLAGS_rpc_max_message_size));
+    }
+    if (total_length_ <= kMsgLengthPrefixLength) {
+      return Status::NetworkError(Substitute("RPC frame had invalid length of $0",
+                                             total_length_));
+    }
+    buf_.resize(total_length_);
+
+    // Fall through to receive the message body, which is likely to be already
+    // available on the socket.
+  }
+
+  // receive message body
+  int32_t nread;
+
+  // Socket::Recv() handles at most INT_MAX at a time, so cap the remainder at
+  // INT_MAX. The message will be split across multiple Recv() calls.
+  // Note that this is only needed when rpc_max_message_size > INT_MAX, which is
+  // currently only used for unit tests.
+  int32_t rem = std::min(total_length_ - cur_offset_,
+      static_cast<uint32_t>(std::numeric_limits<int32_t>::max()));
+  Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+  RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+  cur_offset_ += nread;
+
+  return Status::OK();
+}
+
+bool InboundTransfer::TransferStarted() const {
+  return cur_offset_ != 0;
+}
+
+bool InboundTransfer::TransferFinished() const {
+  return cur_offset_ == total_length_;
+}
+
+string InboundTransfer::StatusAsString() const {
+  return Substitute("$0/$1 bytes received", cur_offset_, total_length_);
+}
+
+OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id,
+                                                         const TransferPayload &payload,
+                                                         size_t n_payload_slices,
+                                                         TransferCallbacks *callbacks) {
+  return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks);
+}
+
+OutboundTransfer* OutboundTransfer::CreateForCallResponse(const TransferPayload &payload,
+                                                          size_t n_payload_slices,
+                                                          TransferCallbacks *callbacks) {
+  return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices, callbacks);
+}
+
+OutboundTransfer::OutboundTransfer(int32_t call_id,
+                                   const TransferPayload &payload,
+                                   size_t n_payload_slices,
+                                   TransferCallbacks *callbacks)
+  : cur_slice_idx_(0),
+    cur_offset_in_slice_(0),
+    callbacks_(callbacks),
+    call_id_(call_id),
+    started_(false),
+    aborted_(false) {
+
+  n_payload_slices_ = n_payload_slices;
+  CHECK_LE(n_payload_slices_, payload_slices_.size());
+  for (int i = 0; i < n_payload_slices; i++) {
+    payload_slices_[i] = payload[i];
+  }
+}
+
+OutboundTransfer::~OutboundTransfer() {
+  if (!TransferFinished() && !aborted_) {
+    callbacks_->NotifyTransferAborted(
+      Status::RuntimeError("RPC transfer destroyed before it finished sending"));
+  }
+}
+
+void OutboundTransfer::Abort(const Status &status) {
+  CHECK(!aborted_) << "Already aborted";
+  CHECK(!TransferFinished()) << "Cannot abort a finished transfer";
+  callbacks_->NotifyTransferAborted(status);
+  aborted_ = true;
+}
+
+Status OutboundTransfer::SendBuffer(Socket &socket) {
+  CHECK_LT(cur_slice_idx_, n_payload_slices_);
+
+  started_ = true;
+  int n_iovecs = n_payload_slices_ - cur_slice_idx_;
+  struct iovec iovec[n_iovecs];
+  {
+    int offset_in_slice = cur_offset_in_slice_;
+    for (int i = 0; i < n_iovecs; i++) {
+      Slice &slice = payload_slices_[cur_slice_idx_ + i];
+      iovec[i].iov_base = slice.mutable_data() + offset_in_slice;
+      iovec[i].iov_len = slice.size() - offset_in_slice;
+
+      offset_in_slice = 0;
+    }
+  }
+
+  int64_t written;
+  Status status = socket.Writev(iovec, n_iovecs, &written);
+  RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+
+  // Adjust our accounting of current writer position.
+  for (int i = cur_slice_idx_; i < n_payload_slices_; i++) {
+    Slice &slice = payload_slices_[i];
+    int rem_in_slice = slice.size() - cur_offset_in_slice_;
+    DCHECK_GE(rem_in_slice, 0);
+
+    if (written >= rem_in_slice) {
+      // Used up this entire slice, advance to the next slice.
+      cur_slice_idx_++;
+      cur_offset_in_slice_ = 0;
+      written -= rem_in_slice;
+    } else {
+      // Partially used up this slice, just advance the offset within it.
+      cur_offset_in_slice_ += written;
+      break;
+    }
+  }
+
+  if (cur_slice_idx_ == n_payload_slices_) {
+    callbacks_->NotifyTransferFinished();
+    DCHECK_EQ(0, cur_offset_in_slice_);
+  } else {
+    DCHECK_LT(cur_slice_idx_, n_payload_slices_);
+    DCHECK_LT(cur_offset_in_slice_, payload_slices_[cur_slice_idx_].size());
+  }
+
+  return Status::OK();
+}
+
+bool OutboundTransfer::TransferStarted() const {
+  return started_;
+}
+
+bool OutboundTransfer::TransferFinished() const {
+  if (cur_slice_idx_ == n_payload_slices_) {
+    DCHECK_EQ(0, cur_offset_in_slice_); // sanity check
+    return true;
+  }
+  return false;
+}
+
+string OutboundTransfer::HexDump() const {
+  if (KUDU_SHOULD_REDACT()) {
+    return kRedactionMessage;
+  }
+
+  string ret;
+  for (int i = 0; i < n_payload_slices_; i++) {
+    ret.append(payload_slices_[i].ToDebugString());
+  }
+  return ret;
+}
+
+int32_t OutboundTransfer::TotalLength() const {
+  int32_t ret = 0;
+  for (int i = 0; i < n_payload_slices_; i++) {
+    ret += payload_slices_[i].size();
+  }
+  return ret;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h
new file mode 100644
index 0000000..b95d43d
--- /dev/null
+++ b/be/src/kudu/rpc/transfer.h
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_TRANSFER_H
+#define KUDU_RPC_TRANSFER_H
+
+#include <array>
+#include <cstddef>
+#include <cstdint>
+#include <limits.h>
+#include <string>
+
+#include <boost/intrusive/list_hook.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+DECLARE_int64(rpc_max_message_size);
+
+namespace kudu {
+
+class Socket;
+
+namespace rpc {
+
+struct TransferCallbacks;
+
+class TransferLimits {
+ public:
+  enum {
+    kMaxSidecars = 10,
+    kMaxPayloadSlices = kMaxSidecars + 2, // (header + msg)
+    kMaxTotalSidecarBytes = INT_MAX
+  };
+
+  DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
+};
+
+typedef std::array<Slice, TransferLimits::kMaxPayloadSlices> TransferPayload;
+
+// This class is used internally by the RPC layer to represent an inbound
+// transfer in progress.
+//
+// Inbound Transfer objects are created by a Connection receiving data. When the
+// message is fully received, it is either parsed as a call, or a call response,
+// and the InboundTransfer object itself is handed off.
+class InboundTransfer {
+ public:
+
+  InboundTransfer();
+
+  // read from the socket into our buffer
+  Status ReceiveBuffer(Socket &socket);
+
+  // Return true if any bytes have yet been sent.
+  bool TransferStarted() const;
+
+  // Return true if the entire transfer has been sent.
+  bool TransferFinished() const;
+
+  Slice data() const {
+    return Slice(buf_);
+  }
+
+  // Return a string indicating the status of this transfer (number of bytes received, etc)
+  // suitable for logging.
+  std::string StatusAsString() const;
+
+ private:
+
+  Status ProcessInboundHeader();
+
+  faststring buf_;
+
+  uint32_t total_length_;
+  uint32_t cur_offset_;
+
+  DISALLOW_COPY_AND_ASSIGN(InboundTransfer);
+};
+
+// When the connection wants to send data, it creates an OutboundTransfer object
+// to encompass it. This sits on a queue within the Connection, so that each time
+// the Connection wakes up with a writable socket, it consumes more bytes off
+// the next pending transfer in the queue.
+//
+// Upon completion of the transfer, a callback is triggered.
+class OutboundTransfer : public boost::intrusive::list_base_hook<> {
+ public:
+  // Factory methods for creating transfers associated with call requests
+  // or responses. The 'payload' slices will be concatenated and
+  // written to the socket. When the transfer completes or errors, the
+  // appropriate method of 'callbacks' is invoked.
+  //
+  // Does not take ownership of the callbacks object or the underlying
+  // memory of the slices. The slices must remain valid until the callback
+  // is triggered.
+  //
+  // NOTE: 'payload' is currently restricted to a maximum of kMaxPayloadSlices
+  // slices.
+  // ------------------------------------------------------------
+
+  // Create an outbound transfer for a call request.
+  static OutboundTransfer* CreateForCallRequest(int32_t call_id,
+                                                const TransferPayload &payload,
+                                                size_t n_payload_slices,
+                                                TransferCallbacks *callbacks);
+
+  // Create an outbound transfer for a call response.
+  // See above for details.
+  static OutboundTransfer* CreateForCallResponse(const TransferPayload &payload,
+                                                 size_t n_payload_slices,
+                                                 TransferCallbacks *callbacks);
+
+  // Destruct the transfer. A transfer object should never be deallocated
+  // before it has either (a) finished transferring, or (b) been Abort()ed.
+  ~OutboundTransfer();
+
+  // Abort the current transfer, with the given status.
+  // This triggers TransferCallbacks::NotifyTransferAborted.
+  void Abort(const Status &status);
+
+  // send from our buffers into the sock
+  Status SendBuffer(Socket &socket);
+
+  // Return true if any bytes have yet been sent.
+  bool TransferStarted() const;
+
+  // Return true if the entire transfer has been sent.
+  bool TransferFinished() const;
+
+  // Return the total number of bytes to be sent (including those already sent)
+  int32_t TotalLength() const;
+
+  std::string HexDump() const;
+
+  bool is_for_outbound_call() const {
+    return call_id_ != kInvalidCallId;
+  }
+
+  // Returns the call ID for a transfer associated with an outbound
+  // call. Must not be called for call responses.
+  int32_t call_id() const {
+    DCHECK_NE(call_id_, kInvalidCallId);
+    return call_id_;
+  }
+
+ private:
+  OutboundTransfer(int32_t call_id,
+                   const TransferPayload& payload,
+                   size_t n_payload_slices,
+                   TransferCallbacks *callbacks);
+
+  // Slices to send. Uses an array here instead of a vector to avoid an expensive
+  // vector construction (improved performance a couple percent).
+  TransferPayload payload_slices_;
+  size_t n_payload_slices_;
+
+  // The current slice that is being sent.
+  int32_t cur_slice_idx_;
+  // The number of bytes in the above slice which has already been sent.
+  int32_t cur_offset_in_slice_;
+
+  TransferCallbacks *callbacks_;
+
+  // In the case of outbound calls, the associated call ID.
+  // In the case of call responses, kInvalidCallId
+  int32_t call_id_;
+
+  // True if SendBuffer() has been called at least once. This can be true even if
+  // no bytes were sent successfully. This is needed as SSL_write() is stateful.
+  // Please see KUDU-2334 for details.
+  bool started_;
+
+  bool aborted_;
+
+  DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);
+};
+
+// Callbacks made after a transfer completes.
+struct TransferCallbacks {
+ public:
+  virtual ~TransferCallbacks();
+
+  // The transfer finished successfully.
+  virtual void NotifyTransferFinished() = 0;
+
+  // The transfer was aborted (e.g because the connection died or an error occurred).
+  virtual void NotifyTransferAborted(const Status &status) = 0;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/user_credentials.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.cc b/be/src/kudu/rpc/user_credentials.cc
new file mode 100644
index 0000000..7f318fe
--- /dev/null
+++ b/be/src/kudu/rpc/user_credentials.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/user_credentials.h"
+
+#include <cstddef>
+#include <string>
+#include <utility>
+
+#include <boost/functional/hash/hash.hpp>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+#include "kudu/util/user.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+bool UserCredentials::has_real_user() const {
+  return !real_user_.empty();
+}
+
+void UserCredentials::set_real_user(string real_user) {
+  real_user_ = std::move(real_user);
+}
+
+Status UserCredentials::SetLoggedInRealUser() {
+  return GetLoggedInUser(&real_user_);
+}
+
+string UserCredentials::ToString() const {
+  return strings::Substitute("{real_user=$0}", real_user_);
+}
+
+size_t UserCredentials::HashCode() const {
+  size_t seed = 0;
+  if (has_real_user()) {
+    boost::hash_combine(seed, real_user());
+  }
+  return seed;
+}
+
+bool UserCredentials::Equals(const UserCredentials& other) const {
+  return real_user() == other.real_user();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/user_credentials.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.h b/be/src/kudu/rpc/user_credentials.h
new file mode 100644
index 0000000..5a0434c
--- /dev/null
+++ b/be/src/kudu/rpc/user_credentials.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstddef>
+#include <string>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+// Client-side user credentials. Currently this is more-or-less a simple wrapper
+// around a username string. However, we anticipate moving more credentials such as
+// tokens into a per-Proxy structure rather than Messenger-wide, and this will
+// be the place to store them.
+class UserCredentials {
+ public:
+  // Real user.
+  bool has_real_user() const;
+  void set_real_user(std::string real_user);
+  const std::string& real_user() const { return real_user_; }
+
+  // Sets the real user to the currently logged in user.
+  Status SetLoggedInRealUser();
+
+  // Returns a string representation of the object.
+  std::string ToString() const;
+
+  std::size_t HashCode() const;
+  bool Equals(const UserCredentials& other) const;
+
+ private:
+  // Remember to update HashCode() and Equals() when new fields are added.
+  std::string real_user_;
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
new file mode 100644
index 0000000..b79486e
--- /dev/null
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# See the comment in krb5_realm_override.cc for details on this library's usage.
+# The top-level CMakeLists sets a ${KRB5_REALM_OVERRIDE} variable which should
+# be linked first into all Kudu binaries.
+
+##############################
+# krb5_realm_override
+##############################
+
+add_library(krb5_realm_override STATIC krb5_realm_override.cc)
+target_link_libraries(krb5_realm_override glog)
+if(NOT APPLE)
+  target_link_libraries(krb5_realm_override dl)
+endif()
+
+##############################
+# token_proto
+##############################
+
+PROTOBUF_GENERATE_CPP(
+  TOKEN_PROTO_SRCS TOKEN_PROTO_HDRS TOKEN_PROTO_TGTS
+  SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+  BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+  PROTO_FILES token.proto)
+set(TOKEN_PROTO_LIBS protobuf pb_util_proto)
+ADD_EXPORTABLE_LIBRARY(token_proto
+  SRCS ${TOKEN_PROTO_SRCS}
+  DEPS ${TOKEN_PROTO_LIBS}
+  NONLINK_DEPS ${TOKEN_PROTO_TGTS})
+
+
+##############################
+# security
+##############################
+
+# Check for krb5_get_init_creds_opt_set_out_ccache, which is not available in versions
+# of MIT Kerberos older than krb5-1.6, and is also not present in Heimdal kerberos.
+include(CheckLibraryExists)
+check_library_exists("krb5" krb5_get_init_creds_opt_set_out_ccache
+  ${KERBEROS_LIBRARY} HAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE)
+if(HAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE)
+  add_definitions(-DHAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE=1)
+endif()
+
+# Fall back to using the ported functionality if we're using an older version of OpenSSL.
+if (${OPENSSL_VERSION} VERSION_LESS "1.0.2")
+  set(PORTED_X509_CHECK_HOST_CC "x509_check_host.cc")
+endif()
+
+set(SECURITY_SRCS
+  ca/cert_management.cc
+  cert.cc
+  crypto.cc
+  kerberos_util.cc
+  init.cc
+  openssl_util.cc
+  ${PORTED_X509_CHECK_HOST_CC}
+  security_flags.cc
+  simple_acl.cc
+  tls_context.cc
+  tls_handshake.cc
+  tls_socket.cc
+  token_verifier.cc
+  token_signer.cc
+  token_signing_key.cc
+  )
+
+set(SECURITY_LIBS
+  gutil
+  kudu_util
+  token_proto
+
+  krb5
+  openssl_crypto
+  openssl_ssl)
+
+ADD_EXPORTABLE_LIBRARY(security
+  SRCS ${SECURITY_SRCS}
+  DEPS ${SECURITY_LIBS})
+
+
+##############################
+# mini_kdc
+##############################
+
+set(MINI_KDC_SRCS test/mini_kdc.cc)
+
+add_library(mini_kdc ${MINI_KDC_SRCS})
+target_link_libraries(mini_kdc
+  gutil
+  kudu_test_util
+  kudu_util)
+
+##############################
+# security_test_util
+##############################
+
+if (NOT NO_TESTS)
+  set(SECURITY_TEST_SRCS
+    security-test-util.cc
+    test/test_certs.cc
+    test/test_pass.cc)
+
+  add_library(security_test_util ${SECURITY_TEST_SRCS})
+  target_link_libraries(security_test_util
+    gutil
+    kudu_test_util
+    kudu_util
+    security)
+
+  # Tests
+  set(KUDU_TEST_LINK_LIBS
+    mini_kdc
+    security
+    security_test_util
+    ${KUDU_MIN_TEST_LIBS})
+
+  ADD_KUDU_TEST(ca/cert_management-test)
+  ADD_KUDU_TEST(cert-test)
+  ADD_KUDU_TEST(crypto-test)
+  ADD_KUDU_TEST(test/mini_kdc-test)
+  ADD_KUDU_TEST(tls_handshake-test)
+  ADD_KUDU_TEST(tls_socket-test PROCESSORS 2)
+  ADD_KUDU_TEST(token-test)
+endif()

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/ca/cert_management-test.cc b/be/src/kudu/security/ca/cert_management-test.cc
new file mode 100644
index 0000000..0c8abc8
--- /dev/null
+++ b/be/src/kudu/security/ca/cert_management-test.cc
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/ca/cert_management.h"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/security-test-util.h"
+#include "kudu/security/test/test_certs.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace security {
+namespace ca {
+
+class CertManagementTest : public KuduTest {
+ public:
+  void SetUp() override {
+    ASSERT_OK(ca_cert_.FromString(kCaCert, DataFormat::PEM));
+    ASSERT_OK(ca_private_key_.FromString(kCaPrivateKey, DataFormat::PEM));
+    ASSERT_OK(ca_public_key_.FromString(kCaPublicKey, DataFormat::PEM));
+    ASSERT_OK(ca_exp_cert_.FromString(kCaExpiredCert, DataFormat::PEM));
+    ASSERT_OK(ca_exp_private_key_.FromString(kCaExpiredPrivateKey, DataFormat::PEM));
+    // Sanity checks.
+    ASSERT_OK(ca_cert_.CheckKeyMatch(ca_private_key_));
+    ASSERT_OK(ca_exp_cert_.CheckKeyMatch(ca_exp_private_key_));
+  }
+
+ protected:
+  CertRequestGenerator::Config PrepareConfig(
+      const string& hostname = "localhost.localdomain") {
+    return { hostname };
+  }
+
+  CaCertRequestGenerator::Config PrepareCaConfig(const string& cn) {
+    return { cn };
+  }
+
+  // Create a new private key in 'key' and return a CSR associated with that
+  // key.
+  template<class CSRGen = CertRequestGenerator>
+  CertSignRequest PrepareTestCSR(typename CSRGen::Config config,
+                                 PrivateKey* key) {
+    CHECK_OK(GeneratePrivateKey(512, key));
+    CSRGen gen(std::move(config));
+    CHECK_OK(gen.Init());
+    CertSignRequest req;
+    CHECK_OK(gen.GenerateRequest(*key, &req));
+    return req;
+  }
+
+  Cert ca_cert_;
+  PrivateKey ca_private_key_;
+  PublicKey ca_public_key_;
+
+  Cert ca_exp_cert_;
+  PrivateKey ca_exp_private_key_;
+};
+
+// Check for basic constraints while initializing CertRequestGenerator objects.
+TEST_F(CertManagementTest, RequestGeneratorConstraints) {
+  const CertRequestGenerator::Config gen_config = PrepareConfig("");
+  CertRequestGenerator gen(gen_config);
+  const Status s = gen.Init();
+  const string err_msg = s.ToString();
+  ASSERT_TRUE(s.IsInvalidArgument()) << err_msg;
+  ASSERT_STR_CONTAINS(err_msg, "hostname must not be empty");
+}
+
+// Check for the basic functionality of the CertRequestGenerator class:
+// check it's able to generate keys of expected number of bits and that it
+// reports an error if trying to generate a key of unsupported number of bits.
+TEST_F(CertManagementTest, RequestGeneratorBasics) {
+  const CertRequestGenerator::Config gen_config = PrepareConfig();
+
+  PrivateKey key;
+  ASSERT_OK(GeneratePrivateKey(1024, &key));
+  CertRequestGenerator gen(gen_config);
+  ASSERT_OK(gen.Init());
+  string key_str;
+  ASSERT_OK(key.ToString(&key_str, DataFormat::PEM));
+  // Check for non-supported number of bits for the key.
+  Status s = GeneratePrivateKey(7, &key);
+  ASSERT_TRUE(s.IsRuntimeError());
+}
+
+// Check that CertSigner behaves in a predictable way if given non-matching
+// CA private key and certificate.
+TEST_F(CertManagementTest, SignerInitWithMismatchedCertAndKey) {
+  PrivateKey key;
+  const auto& csr = PrepareTestCSR(PrepareConfig(), &key);
+  {
+    Cert cert;
+    Status s = CertSigner(&ca_cert_, &ca_exp_private_key_)
+        .Sign(csr, &cert);
+
+    const string err_msg = s.ToString();
+    ASSERT_TRUE(s.IsRuntimeError()) << err_msg;
+    ASSERT_STR_CONTAINS(err_msg, "certificate does not match private key");
+  }
+  {
+    Cert cert;
+    Status s = CertSigner(&ca_exp_cert_, &ca_private_key_)
+        .Sign(csr, &cert);
+    const string err_msg = s.ToString();
+    ASSERT_TRUE(s.IsRuntimeError()) << err_msg;
+    ASSERT_STR_CONTAINS(err_msg, "certificate does not match private key");
+  }
+}
+
+// Check how CertSigner behaves if given expired CA certificate
+// and corresponding private key.
+TEST_F(CertManagementTest, SignerInitWithExpiredCert) {
+  const CertRequestGenerator::Config gen_config = PrepareConfig();
+  PrivateKey key;
+  CertSignRequest req = PrepareTestCSR(gen_config, &key);
+
+  // Signer works fine even with expired CA certificate.
+  Cert cert;
+  ASSERT_OK(CertSigner(&ca_exp_cert_, &ca_exp_private_key_).Sign(req, &cert));
+  ASSERT_OK(cert.CheckKeyMatch(key));
+}
+
+// Generate X509 CSR and issue corresponding certificate putting the specified
+// hostname into the SAN X509v3 extension field. The fix for KUDU-1981 addresses
+// the issue of enabling Kudu server components on systems with FQDN longer than
+// 64 characters. This test is a regression for KUDU-1981, so let's verify that
+// CSRs and the result X509 cerificates with long hostnames in SAN are handled
+// properly.
+TEST_F(CertManagementTest, SignCertLongHostnameInSan) {
+  for (auto const& hostname :
+      {
+        "foo.bar.com",
+
+        "222222222222222222222222222222222222222222222222222222222222222."
+        "555555555555555555555555555555555555555555555555555555555555555."
+        "555555555555555555555555555555555555555555555555555555555555555."
+        "chaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaars",
+      }) {
+    CertRequestGenerator::Config gen_config;
+    gen_config.hostname = hostname;
+    gen_config.user_id = "test-uid";
+    PrivateKey key;
+    const auto& csr = PrepareTestCSR(gen_config, &key);
+    Cert cert;
+    ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert));
+    ASSERT_OK(cert.CheckKeyMatch(key));
+
+    EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = my@email.com",
+              cert.IssuerName());
+    EXPECT_EQ("UID = test-uid", cert.SubjectName());
+    vector<string> hostnames = cert.Hostnames();
+    ASSERT_EQ(1, hostnames.size());
+    EXPECT_EQ(hostname, hostnames[0]);
+  }
+}
+
+// Generate X509 CSR and issues corresponding certificate.
+TEST_F(CertManagementTest, SignCert) {
+  CertRequestGenerator::Config gen_config;
+  gen_config.hostname = "foo.bar.com";
+  gen_config.user_id = "test-uid";
+  gen_config.kerberos_principal = "kudu/foo.bar.com@bar.com";
+  PrivateKey key;
+  const auto& csr = PrepareTestCSR(gen_config, &key);
+  Cert cert;
+  ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert));
+  ASSERT_OK(cert.CheckKeyMatch(key));
+
+  EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = my@email.com",
+            cert.IssuerName());
+  EXPECT_EQ("UID = test-uid", cert.SubjectName());
+  EXPECT_EQ(gen_config.user_id, *cert.UserId());
+  EXPECT_EQ(gen_config.kerberos_principal, *cert.KuduKerberosPrincipal());
+  vector<string> hostnames = cert.Hostnames();
+  ASSERT_EQ(1, hostnames.size());
+  EXPECT_EQ("foo.bar.com", hostnames[0]);
+}
+
+// Generate X509 CA CSR and sign the result certificate.
+TEST_F(CertManagementTest, SignCaCert) {
+  const CaCertRequestGenerator::Config gen_config(PrepareCaConfig("self-ca"));
+  PrivateKey key;
+  const auto& csr = PrepareTestCSR<CaCertRequestGenerator>(gen_config, &key);
+  Cert cert;
+  ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert));
+  ASSERT_OK(cert.CheckKeyMatch(key));
+}
+
+// Test the creation and use of a CA which uses a self-signed CA cert
+// generated on the fly.
+TEST_F(CertManagementTest, TestSelfSignedCA) {
+  PrivateKey ca_key;
+  Cert ca_cert;
+  ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+  // Create a key and CSR for the tablet server.
+  const auto& config = PrepareConfig();
+  PrivateKey ts_key;
+  CertSignRequest ts_csr = PrepareTestCSR(config, &ts_key);
+
+  // Sign it using the self-signed CA.
+  Cert ts_cert;
+  ASSERT_OK(CertSigner(&ca_cert, &ca_key).Sign(ts_csr, &ts_cert));
+  ASSERT_OK(ts_cert.CheckKeyMatch(ts_key));
+}
+
+// Check the transformation chains for X509 CSRs:
+//   internal -> PEM -> internal -> PEM
+//   internal -> DER -> internal -> DER
+TEST_F(CertManagementTest, X509CsrFromAndToString) {
+  static const DataFormat kFormats[] = { DataFormat::PEM, DataFormat::DER };
+
+  PrivateKey key;
+  ASSERT_OK(GeneratePrivateKey(1024, &key));
+  CertRequestGenerator gen(PrepareConfig());
+  ASSERT_OK(gen.Init());
+  CertSignRequest req_ref;
+  ASSERT_OK(gen.GenerateRequest(key, &req_ref));
+
+  for (auto format : kFormats) {
+    SCOPED_TRACE(Substitute("X509 CSR format: $0", DataFormatToString(format)));
+    string str_req_ref;
+    ASSERT_OK(req_ref.ToString(&str_req_ref, format));
+    CertSignRequest req;
+    ASSERT_OK(req.FromString(str_req_ref, format));
+    string str_req;
+    ASSERT_OK(req.ToString(&str_req, format));
+    ASSERT_EQ(str_req_ref, str_req);
+  }
+}
+
+// Check the transformation chains for X509 certs:
+//   internal -> PEM -> internal -> PEM
+//   internal -> DER -> internal -> DER
+TEST_F(CertManagementTest, X509FromAndToString) {
+  static const DataFormat kFormats[] = { DataFormat::PEM, DataFormat::DER };
+
+  PrivateKey key;
+  ASSERT_OK(GeneratePrivateKey(1024, &key));
+  CertRequestGenerator gen(PrepareConfig());
+  ASSERT_OK(gen.Init());
+  CertSignRequest req;
+  ASSERT_OK(gen.GenerateRequest(key, &req));
+
+  Cert cert_ref;
+  ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_)
+            .Sign(req, &cert_ref));
+
+  for (auto format : kFormats) {
+    SCOPED_TRACE(Substitute("X509 format: $0", DataFormatToString(format)));
+    string str_cert_ref;
+    ASSERT_OK(cert_ref.ToString(&str_cert_ref, format));
+    Cert cert;
+    ASSERT_OK(cert.FromString(str_cert_ref, format));
+    string str_cert;
+    ASSERT_OK(cert.ToString(&str_cert, format));
+    ASSERT_EQ(str_cert_ref, str_cert);
+  }
+}
+
+} // namespace ca
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/ca/cert_management.cc b/be/src/kudu/security/ca/cert_management.cc
new file mode 100644
index 0000000..7ccc376
--- /dev/null
+++ b/be/src/kudu/security/ca/cert_management.cc
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/ca/cert_management.h"
+
+#include <algorithm>
+#include <cstdio>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include <glog/logging.h>
+#include <openssl/conf.h>
+#ifndef OPENSSL_NO_ENGINE
+#include <openssl/engine.h>
+#endif
+#include <openssl/pem.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+
+using std::lock_guard;
+using std::move;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace security {
+
+template<> struct SslTypeTraits<ASN1_INTEGER> {
+  static constexpr auto kFreeFunc = &ASN1_INTEGER_free;
+};
+template<> struct SslTypeTraits<BIGNUM> {
+  static constexpr auto kFreeFunc = &BN_free;
+};
+
+namespace ca {
+
+namespace {
+
+Status SetSubjectNameField(X509_NAME* name,
+                           const char* field_code,
+                           const string& field_value) {
+  CHECK(name);
+  CHECK(field_code);
+  OPENSSL_RET_NOT_OK(X509_NAME_add_entry_by_txt(
+      name, field_code, MBSTRING_ASC,
+      reinterpret_cast<const unsigned char*>(field_value.c_str()), -1, -1, 0),
+      Substitute("error setting subject field $0", field_code));
+  return Status::OK();
+}
+
+} // anonymous namespace
+
+CertRequestGenerator::~CertRequestGenerator() {
+  sk_X509_EXTENSION_pop_free(extensions_, X509_EXTENSION_free);
+}
+
+Status CertRequestGeneratorBase::GenerateRequest(const PrivateKey& key,
+                                                 CertSignRequest* ret) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(ret);
+  CHECK(Initialized());
+  auto req = ssl_make_unique(X509_REQ_new());
+  OPENSSL_RET_NOT_OK(X509_REQ_set_pubkey(req.get(), key.GetRawData()),
+      "error setting X509 public key");
+
+  // Populate the subject field of the request.
+  RETURN_NOT_OK(SetSubject(req.get()));
+
+  // Set necessary extensions into the request.
+  RETURN_NOT_OK(SetExtensions(req.get()));
+
+  // And finally sign the result.
+  OPENSSL_RET_NOT_OK(X509_REQ_sign(req.get(), key.GetRawData(), EVP_sha256()),
+      "error signing X509 request");
+  ret->AdoptRawData(req.release());
+
+  return Status::OK();
+}
+
+Status CertRequestGeneratorBase::PushExtension(stack_st_X509_EXTENSION* st,
+                                               int32_t nid, StringPiece value) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  auto ex = ssl_make_unique(
+      X509V3_EXT_conf_nid(nullptr, nullptr, nid, const_cast<char*>(value.data())));
+  OPENSSL_RET_IF_NULL(ex, "error configuring extension");
+  OPENSSL_RET_NOT_OK(sk_X509_EXTENSION_push(st, ex.release()),
+      "error pushing extension into the stack");
+  return Status::OK();
+}
+
+CertRequestGenerator::CertRequestGenerator(Config config)
+    : CertRequestGeneratorBase(),
+      config_(std::move(config)) {
+}
+
+Status CertRequestGenerator::Init() {
+  InitializeOpenSSL();
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+
+  CHECK(!is_initialized_);
+
+  // Build the SAN field using the specified hostname. In general, it might be
+  // multiple DNS hostnames in the field, but in our use-cases it's always one.
+  if (config_.hostname.empty()) {
+    return Status::InvalidArgument("hostname must not be empty");
+  }
+  const string san_hosts = Substitute("DNS.0:$0", config_.hostname);
+
+  extensions_ = sk_X509_EXTENSION_new_null();
+
+  // Permitted usages for the generated keys is set via X509 V3
+  // standard/extended key usage attributes.
+  // See https://www.openssl.org/docs/man1.0.1/apps/x509v3_config.html
+  // for details.
+
+  // The generated certificates are for using as TLS certificates for
+  // both client and server.
+  string usage = "critical,digitalSignature,keyEncipherment";
+  if (for_self_signing_) {
+    // If we are generating a CSR for self-signing, then we need to
+    // add this keyUsage attribute. See https://s.apache.org/BFHk
+    usage += ",keyCertSign";
+  }
+
+  RETURN_NOT_OK(PushExtension(extensions_, NID_key_usage, usage));
+  // The generated certificates should be good for authentication
+  // of a server to a client and vice versa: the intended users of the
+  // certificates are tablet servers which are going to talk to master
+  // and other tablet servers via TLS channels.
+  RETURN_NOT_OK(PushExtension(extensions_, NID_ext_key_usage,
+                              "critical,serverAuth,clientAuth"));
+
+  // The generated certificates are not intended to be used as CA certificates
+  // (i.e. they cannot be used to sign/issue certificates).
+  RETURN_NOT_OK(PushExtension(extensions_, NID_basic_constraints,
+                              "critical,CA:FALSE"));
+
+  if (config_.kerberos_principal) {
+    int nid = GetKuduKerberosPrincipalOidNid();
+    RETURN_NOT_OK(PushExtension(extensions_, nid,
+                                Substitute("ASN1:UTF8:$0", *config_.kerberos_principal)));
+  }
+  RETURN_NOT_OK(PushExtension(extensions_, NID_subject_alt_name, san_hosts));
+
+  is_initialized_ = true;
+
+  return Status::OK();
+}
+
+bool CertRequestGenerator::Initialized() const {
+  return is_initialized_;
+}
+
+Status CertRequestGenerator::SetSubject(X509_REQ* req) const {
+  if (config_.user_id) {
+    RETURN_NOT_OK(SetSubjectNameField(X509_REQ_get_subject_name(req),
+                                      "UID", *config_.user_id));
+  }
+  return Status::OK();
+}
+
+Status CertRequestGenerator::SetExtensions(X509_REQ* req) const {
+  OPENSSL_RET_NOT_OK(X509_REQ_add_extensions(req, extensions_),
+      "error setting X509 request extensions");
+  return Status::OK();
+}
+
+CaCertRequestGenerator::CaCertRequestGenerator(Config config)
+    : config_(std::move(config)),
+      extensions_(nullptr),
+      is_initialized_(false) {
+}
+
+CaCertRequestGenerator::~CaCertRequestGenerator() {
+  sk_X509_EXTENSION_pop_free(extensions_, X509_EXTENSION_free);
+}
+
+Status CaCertRequestGenerator::Init() {
+  InitializeOpenSSL();
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+
+  lock_guard<simple_spinlock> guard(lock_);
+  if (is_initialized_) {
+    return Status::OK();
+  }
+  if (config_.cn.empty()) {
+    return Status::InvalidArgument("missing CA service UUID/name");
+  }
+
+  extensions_ = sk_X509_EXTENSION_new_null();
+
+  // Permitted usages for the generated keys is set via X509 V3
+  // standard/extended key usage attributes.
+  // See https://www.openssl.org/docs/man1.0.1/apps/x509v3_config.html
+  // for details.
+
+  // The target ceritifcate is a CA certificate: it's for signing X509 certs.
+  RETURN_NOT_OK(PushExtension(extensions_, NID_key_usage,
+                              "critical,keyCertSign"));
+  // The generated certificates are for the private CA service.
+  RETURN_NOT_OK(PushExtension(extensions_, NID_basic_constraints,
+                              "critical,CA:TRUE"));
+  is_initialized_ = true;
+
+  return Status::OK();
+}
+
+bool CaCertRequestGenerator::Initialized() const {
+  lock_guard<simple_spinlock> guard(lock_);
+  return is_initialized_;
+}
+
+Status CaCertRequestGenerator::SetSubject(X509_REQ* req) const {
+  return SetSubjectNameField(X509_REQ_get_subject_name(req), "CN", config_.cn);
+}
+
+Status CaCertRequestGenerator::SetExtensions(X509_REQ* req) const {
+  OPENSSL_RET_NOT_OK(X509_REQ_add_extensions(req, extensions_),
+      "error setting X509 request extensions");
+  return Status::OK();
+}
+
+Status CertSigner::SelfSignCA(const PrivateKey& key,
+                              CaCertRequestGenerator::Config config,
+                              int64_t cert_expiration_seconds,
+                              Cert* cert) {
+  // Generate a CSR for the CA.
+  CertSignRequest ca_csr;
+  {
+    CaCertRequestGenerator gen(std::move(config));
+    RETURN_NOT_OK(gen.Init());
+    RETURN_NOT_OK(gen.GenerateRequest(key, &ca_csr));
+  }
+
+  // Self-sign the CA's CSR.
+  return CertSigner(nullptr, &key)
+      .set_expiration_interval(MonoDelta::FromSeconds(cert_expiration_seconds))
+      .Sign(ca_csr, cert);
+}
+
+Status CertSigner::SelfSignCert(const PrivateKey& key,
+                                CertRequestGenerator::Config config,
+                                Cert* cert) {
+  // Generate a CSR.
+  CertSignRequest csr;
+  {
+    CertRequestGenerator gen(std::move(config));
+    gen.enable_self_signing();
+    RETURN_NOT_OK(gen.Init());
+    RETURN_NOT_OK(gen.GenerateRequest(key, &csr));
+  }
+
+  // Self-sign the CSR with the key.
+  return CertSigner(nullptr, &key).Sign(csr, cert);
+}
+
+
+CertSigner::CertSigner(const Cert* ca_cert,
+                       const PrivateKey* ca_private_key)
+    : ca_cert_(ca_cert),
+      ca_private_key_(ca_private_key) {
+  // Private key is required.
+  CHECK(ca_private_key_ && ca_private_key_->GetRawData());
+  // The cert is optional, but if we have it, it should be initialized.
+  CHECK(!ca_cert_ || ca_cert_->GetRawData());
+}
+
+Status CertSigner::Sign(const CertSignRequest& req, Cert* ret) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  InitializeOpenSSL();
+  CHECK(ret);
+
+  // If we are not self-signing, then make sure that the provided CA
+  // cert and key match each other. Technically this would be programmer
+  // error since we're always using internally-generated CA certs, but
+  // this isn't a hot path so we'll keep the extra safety.
+  if (ca_cert_) {
+    RETURN_NOT_OK(ca_cert_->CheckKeyMatch(*ca_private_key_));
+  }
+  auto x509 = ssl_make_unique(X509_new());
+  RETURN_NOT_OK(FillCertTemplateFromRequest(req.GetRawData(), x509.get()));
+  RETURN_NOT_OK(DoSign(EVP_sha256(), exp_interval_sec_, x509.get()));
+  ret->AdoptX509(x509.release());
+
+  return Status::OK();
+}
+
+// This is modeled after code in copy_extensions() function from
+// $OPENSSL_ROOT/apps/apps.c with OpenSSL 1.0.2.
+Status CertSigner::CopyExtensions(X509_REQ* req, X509* x) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(req);
+  CHECK(x);
+  STACK_OF(X509_EXTENSION)* exts = X509_REQ_get_extensions(req);
+  SCOPED_CLEANUP({
+    sk_X509_EXTENSION_pop_free(exts, X509_EXTENSION_free);
+  });
+  for (size_t i = 0; i < sk_X509_EXTENSION_num(exts); ++i) {
+    X509_EXTENSION* ext = sk_X509_EXTENSION_value(exts, i);
+    ASN1_OBJECT* obj = X509_EXTENSION_get_object(ext);
+    int32_t idx = X509_get_ext_by_OBJ(x, obj, -1);
+    if (idx != -1) {
+      // If extension exits, delete all extensions of same type.
+      do {
+        auto tmpext = ssl_make_unique(X509_get_ext(x, idx));
+        X509_delete_ext(x, idx);
+        idx = X509_get_ext_by_OBJ(x, obj, -1);
+      } while (idx != -1);
+    }
+    OPENSSL_RET_NOT_OK(X509_add_ext(x, ext, -1), "error adding extension");
+  }
+
+  return Status::OK();
+}
+
+Status CertSigner::FillCertTemplateFromRequest(X509_REQ* req, X509* tmpl) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(req);
+
+  // As of OpenSSL 1.1, req's internals are hidden.
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  if (!req->req_info ||
+      !req->req_info->pubkey ||
+      !req->req_info->pubkey->public_key ||
+      !req->req_info->pubkey->public_key->data) {
+    return Status::RuntimeError("corrupted CSR: no public key");
+  }
+#endif
+  auto pub_key = ssl_make_unique(X509_REQ_get_pubkey(req));
+  OPENSSL_RET_IF_NULL(pub_key, "error unpacking public key from CSR");
+  const int rc = X509_REQ_verify(req, pub_key.get());
+  if (rc < 0) {
+    return Status::RuntimeError("CSR signature verification error",
+                                GetOpenSSLErrors());
+  }
+  if (rc == 0) {
+    return Status::RuntimeError("CSR signature mismatch",
+                                GetOpenSSLErrors());
+  }
+  OPENSSL_RET_NOT_OK(X509_set_subject_name(tmpl, X509_REQ_get_subject_name(req)),
+      "error setting cert subject name");
+  RETURN_NOT_OK(CopyExtensions(req, tmpl));
+  OPENSSL_RET_NOT_OK(X509_set_pubkey(tmpl, pub_key.get()),
+      "error setting cert public key");
+  return Status::OK();
+}
+
+Status CertSigner::DigestSign(const EVP_MD* md, EVP_PKEY* pkey, X509* x) {
+  OPENSSL_RET_NOT_OK(X509_sign(x, pkey, md), "error signing certificate");
+  return Status::OK();
+}
+
+Status CertSigner::GenerateSerial(c_unique_ptr<ASN1_INTEGER>* ret) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  auto btmp = ssl_make_unique(BN_new());
+  OPENSSL_RET_NOT_OK(BN_pseudo_rand(btmp.get(), 64, 0, 0),
+      "error generating random number");
+  auto serial = ssl_make_unique(ASN1_INTEGER_new());
+  OPENSSL_RET_IF_NULL(BN_to_ASN1_INTEGER(btmp.get(), serial.get()),
+      "error converting number into ASN1 representation");
+  if (ret) {
+    ret->swap(serial);
+  }
+  return Status::OK();
+}
+
+Status CertSigner::DoSign(const EVP_MD* digest, int32_t exp_seconds,
+                          X509* ret) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(ret);
+
+  // Version 3 (v3) of X509 certificates. The integer value is one less
+  // than the version it represents. This is not a typo. :)
+  static const int kX509V3 = 2;
+
+  // If we have a CA cert, then the CA is the issuer.
+  // Otherwise, we are self-signing so the target cert is also the issuer.
+  X509* issuer_cert = ca_cert_ ? ca_cert_->GetTopOfChainX509() : ret;
+  X509_NAME* issuer_name = X509_get_subject_name(issuer_cert);
+  OPENSSL_RET_NOT_OK(X509_set_issuer_name(ret, issuer_name),
+      "error setting issuer name");
+  c_unique_ptr<ASN1_INTEGER> serial;
+  RETURN_NOT_OK(GenerateSerial(&serial));
+  // set version to v3
+  OPENSSL_RET_NOT_OK(X509_set_version(ret, kX509V3),
+      "error setting cert version");
+  OPENSSL_RET_NOT_OK(X509_set_serialNumber(ret, serial.get()),
+      "error setting cert serial");
+  OPENSSL_RET_IF_NULL(X509_gmtime_adj(X509_get_notBefore(ret), 0L),
+      "error setting cert validity time");
+  OPENSSL_RET_IF_NULL(X509_gmtime_adj(X509_get_notAfter(ret), exp_seconds),
+      "error setting cert expiration time");
+  RETURN_NOT_OK(DigestSign(digest, ca_private_key_->GetRawData(), ret));
+
+  return Status::OK();
+}
+
+} // namespace ca
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/ca/cert_management.h b/be/src/kudu/security/ca/cert_management.h
new file mode 100644
index 0000000..fb2bd0e
--- /dev/null
+++ b/be/src/kudu/security/ca/cert_management.h
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+// Forward declarations for the relevant OpenSSL typedefs
+// in addition to openssl_util.h.
+typedef struct asn1_string_st ASN1_INTEGER;
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+typedef struct env_md_st EVP_MD;
+#else
+typedef struct evp_md_st EVP_MD;
+#endif
+typedef struct rsa_st RSA;
+typedef struct x509_st X509;
+typedef struct X509_req_st X509_REQ;
+
+// STACK_OF(X509_EXTENSION)
+struct stack_st_X509_EXTENSION; // IWYU pragma: keep
+
+namespace kudu {
+namespace security {
+
+class Cert;
+class CertSignRequest;
+class PrivateKey;
+
+namespace ca {
+
+// Base utility class for issuing X509 CSRs.
+class CertRequestGeneratorBase {
+ public:
+  CertRequestGeneratorBase() = default;
+  virtual ~CertRequestGeneratorBase() = default;
+
+  virtual Status Init() = 0;
+  virtual bool Initialized() const = 0;
+
+  // Generate X509 CSR using the specified key. To obtain the key,
+  // call the GeneratePrivateKey() function.
+  Status GenerateRequest(const PrivateKey& key, CertSignRequest* ret) const WARN_UNUSED_RESULT;
+
+ protected:
+  // Push the specified extension into the stack provided.
+  static Status PushExtension(stack_st_X509_EXTENSION* st,
+                              int32_t nid,
+                              StringPiece value) WARN_UNUSED_RESULT;
+
+  // Set the certificate-specific subject fields into the specified request.
+  virtual Status SetSubject(X509_REQ* req) const = 0;
+
+  // Set the certificate-specific extensions into the specified request.
+  virtual Status SetExtensions(X509_REQ* req) const = 0;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CertRequestGeneratorBase);
+};
+
+// An utility class that facilitates issuing certificate signing requests
+// (a.k.a. X509 CSRs).
+class CertRequestGenerator : public CertRequestGeneratorBase {
+ public:
+  // Properties for the generated X509 CSR. The 'hostname' is for the name of
+  // the machine the requestor is to use the certificate at. Valid configuration
+  // should contain non-empty 'hostname' field.
+  struct Config {
+    // FQDN name to put into the 'DNS' fields of the subjectAltName extension.
+    std::string hostname;
+    // userId (UID)
+    boost::optional<std::string> user_id;
+    // Our custom extension which stores the full Kerberos principal for IPKI certs.
+    boost::optional<std::string> kerberos_principal;
+  };
+
+  // 'config' contains the properties to fill into the X509 attributes of the
+  // CSR.
+  explicit CertRequestGenerator(Config config);
+  ~CertRequestGenerator();
+
+  Status Init() override WARN_UNUSED_RESULT;
+  bool Initialized() const override;
+
+  CertRequestGenerator& enable_self_signing() {
+    CHECK(!is_initialized_);
+    for_self_signing_ = true;
+    return *this;
+  }
+
+ protected:
+  Status SetSubject(X509_REQ* req) const override WARN_UNUSED_RESULT;
+  Status SetExtensions(X509_REQ* req) const override WARN_UNUSED_RESULT;
+
+ private:
+  const Config config_;
+  stack_st_X509_EXTENSION* extensions_ = nullptr;
+  bool is_initialized_ = false;
+  bool for_self_signing_ = false;
+};
+
+// An utility class that facilitates issuing of root CA self-signed certificate
+// signing requests.
+class CaCertRequestGenerator : public CertRequestGeneratorBase {
+ public:
+  // Properties for the generated X509 CA CSR.
+  struct Config {
+    // Common name (CN); e.g. 'master 239D6D2F-BDD2-4463-8933-78D9559C2124'.
+    // Don't put hostname/FQDN in here: for CA cert it does not make sense and
+    // it might be longer than 64 characters which is the limit specified
+    // by RFC5280. The limit is enforced by the OpenSSL library.
+    std::string cn;
+  };
+
+  explicit CaCertRequestGenerator(Config config);
+  ~CaCertRequestGenerator();
+
+  Status Init() override WARN_UNUSED_RESULT;
+  bool Initialized() const override;
+
+ protected:
+  Status SetSubject(X509_REQ* req) const override WARN_UNUSED_RESULT;
+  Status SetExtensions(X509_REQ* req) const override WARN_UNUSED_RESULT;
+
+ private:
+  const Config config_;
+  stack_st_X509_EXTENSION* extensions_;
+  mutable simple_spinlock lock_;
+  bool is_initialized_; // protected by lock_
+};
+
+// An utility class for issuing and signing certificates.
+//
+// This is used in "fluent" style. For example:
+//
+//    CHECK_OK(CertSigner(&my_ca_cert, &my_ca_key)
+//      .set_expiration_interval(MonoDelta::FromSeconds(3600))
+//      .Sign(csr, &cert));
+//
+// As such, this class is not guaranteed thread-safe.
+class CertSigner {
+ public:
+  // Generate a self-signed certificate authority using the given key
+  // and CSR configuration.
+  static Status SelfSignCA(const PrivateKey& key,
+                           CaCertRequestGenerator::Config config,
+                           int64_t cert_expiration_seconds,
+                           Cert* cert) WARN_UNUSED_RESULT;
+
+  // Generate a self-signed certificate using the given key and CSR
+  // configuration.
+  static Status SelfSignCert(const PrivateKey& key,
+                             CertRequestGenerator::Config config,
+                             Cert* cert) WARN_UNUSED_RESULT;
+
+  // Create a CertSigner.
+  //
+  // The given cert and key must stay valid for the lifetime of the
+  // cert signer. See class documentation above for recommended usage.
+  //
+  // 'ca_cert' may be nullptr in order to perform self-signing (though
+  // the SelfSignCA() static method above is recommended).
+  CertSigner(const Cert* ca_cert, const PrivateKey* ca_private_key);
+  ~CertSigner() = default;
+
+  // Set the expiration interval for certs signed by this signer.
+  // This may be changed at any point.
+  CertSigner& set_expiration_interval(MonoDelta expiration) {
+    exp_interval_sec_ = expiration.ToSeconds();
+    return *this;
+  }
+
+  Status Sign(const CertSignRequest& req, Cert* ret) const WARN_UNUSED_RESULT;
+
+ private:
+
+  static Status CopyExtensions(X509_REQ* req, X509* x) WARN_UNUSED_RESULT;
+  static Status FillCertTemplateFromRequest(X509_REQ* req, X509* tmpl) WARN_UNUSED_RESULT;
+  static Status DigestSign(const EVP_MD* md, EVP_PKEY* pkey, X509* x) WARN_UNUSED_RESULT;
+  static Status GenerateSerial(c_unique_ptr<ASN1_INTEGER>* ret) WARN_UNUSED_RESULT;
+
+  Status DoSign(const EVP_MD* digest, int32_t exp_seconds, X509 *ret) const WARN_UNUSED_RESULT;
+
+  // The expiration interval of certs signed by this signer.
+  int32_t exp_interval_sec_ = 24 * 60 * 60;
+
+  // The CA cert. null if this CertSigner is configured for self-signing.
+  const Cert* const ca_cert_;
+
+  // The CA private key. If configured for self-signing, this is the
+  // private key associated with the target cert.
+  const PrivateKey* const ca_private_key_;
+
+  DISALLOW_COPY_AND_ASSIGN(CertSigner);
+};
+
+} // namespace ca
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/cert-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/cert-test.cc b/be/src/kudu/security/cert-test.cc
new file mode 100644
index 0000000..12205e1
--- /dev/null
+++ b/be/src/kudu/security/cert-test.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/test/test_certs.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::pair;
+using std::string;
+using std::thread;
+using std::vector;
+
+namespace kudu {
+namespace security {
+
+// Test for various certificate-related functionality in the security library.
+// These do not cover CA certificate mananagement part; check
+// cert_management-test.cc for those.
+class CertTest : public KuduTest {
+ public:
+  void SetUp() override {
+    ASSERT_OK(ca_cert_.FromString(kCaCert, DataFormat::PEM));
+    ASSERT_OK(ca_private_key_.FromString(kCaPrivateKey, DataFormat::PEM));
+    ASSERT_OK(ca_public_key_.FromString(kCaPublicKey, DataFormat::PEM));
+    ASSERT_OK(ca_exp_cert_.FromString(kCaExpiredCert, DataFormat::PEM));
+    ASSERT_OK(ca_exp_private_key_.FromString(kCaExpiredPrivateKey,
+                                             DataFormat::PEM));
+    // Sanity checks.
+    ASSERT_OK(ca_cert_.CheckKeyMatch(ca_private_key_));
+    ASSERT_OK(ca_exp_cert_.CheckKeyMatch(ca_exp_private_key_));
+  }
+
+ protected:
+  Cert ca_cert_;
+  PrivateKey ca_private_key_;
+  PublicKey ca_public_key_;
+
+  Cert ca_exp_cert_;
+  PrivateKey ca_exp_private_key_;
+};
+
+// Regression test to make sure that GetKuduKerberosPrincipalOidNid is thread
+// safe. OpenSSL 1.0.0's OBJ_create method is not thread safe.
+TEST_F(CertTest, GetKuduKerberosPrincipalOidNidConcurrent) {
+  int kConcurrency = 16;
+  Barrier barrier(kConcurrency);
+
+  vector<thread> threads;
+  for (int i = 0; i < kConcurrency; i++) {
+    threads.emplace_back([&] () {
+        barrier.Wait();
+        CHECK_NE(NID_undef, GetKuduKerberosPrincipalOidNid());
+    });
+  }
+
+  for (auto& thread : threads) {
+    thread.join();
+  }
+}
+
+// Check input/output of the X509 certificates in PEM format.
+TEST_F(CertTest, CertInputOutputPEM) {
+  const Cert& cert = ca_cert_;
+  string cert_str;
+  ASSERT_OK(cert.ToString(&cert_str, DataFormat::PEM));
+  RemoveExtraWhitespace(&cert_str);
+
+  string ca_input_cert(kCaCert);
+  RemoveExtraWhitespace(&ca_input_cert);
+  EXPECT_EQ(ca_input_cert, cert_str);
+}
+
+// Check that Cert behaves in a predictable way if given invalid PEM data.
+TEST_F(CertTest, CertInvalidInput) {
+  // Providing files which guaranteed to exists, but do not contain valid data.
+  // This is to make sure the init handles that situation correctly and
+  // does not choke on the wrong input data.
+  Cert c;
+  ASSERT_FALSE(c.FromFile("/bin/sh", DataFormat::PEM).ok());
+}
+
+// Check X509 certificate/private key matching: match cases.
+TEST_F(CertTest, CertMatchesRsaPrivateKey) {
+  const pair<const Cert*, const PrivateKey*> cases[] = {
+    { &ca_cert_,      &ca_private_key_      },
+    { &ca_exp_cert_,  &ca_exp_private_key_  },
+  };
+  for (const auto& e : cases) {
+    EXPECT_OK(e.first->CheckKeyMatch(*e.second));
+  }
+}
+
+// Check X509 certificate/private key matching: mismatch cases.
+TEST_F(CertTest, CertMismatchesRsaPrivateKey) {
+  const pair<const Cert*, const PrivateKey*> cases[] = {
+    { &ca_cert_,      &ca_exp_private_key_  },
+    { &ca_exp_cert_,  &ca_private_key_      },
+  };
+  for (const auto& e : cases) {
+    const Status s = e.first->CheckKeyMatch(*e.second);
+    EXPECT_TRUE(s.IsRuntimeError()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "certificate does not match private key");
+  }
+}
+
+TEST_F(CertTest, TestGetKuduSpecificFieldsWhenMissing) {
+  EXPECT_EQ(boost::none, ca_cert_.UserId());
+  EXPECT_EQ(boost::none, ca_cert_.KuduKerberosPrincipal());
+}
+
+TEST_F(CertTest, DnsHostnameInSanField) {
+  const string hostname_foo_bar = "foo.bar.com";
+  const string hostname_mega_giga = "mega.giga.io";
+  const string hostname_too_long =
+      "toooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo."
+      "looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+      "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+      "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+      "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+      "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+      "ng.hostname.io";
+
+  Cert cert;
+  ASSERT_OK(cert.FromString(kCertDnsHostnamesInSan, DataFormat::PEM));
+
+  EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = my@email.com",
+            cert.IssuerName());
+  vector<string> hostnames = cert.Hostnames();
+  ASSERT_EQ(3, hostnames.size());
+  EXPECT_EQ(hostname_mega_giga, hostnames[0]);
+  EXPECT_EQ(hostname_foo_bar, hostnames[1]);
+  EXPECT_EQ(hostname_too_long, hostnames[2]);
+}
+
+} // namespace security
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/cert.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/cert.cc b/be/src/kudu/security/cert.cc
new file mode 100644
index 0000000..b81d263
--- /dev/null
+++ b/be/src/kudu/security/cert.cc
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/cert.h"
+
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <openssl/evp.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/openssl_util_bio.h"
+#include "kudu/util/status.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace security {
+
+template<> struct SslTypeTraits<GENERAL_NAMES> {
+  static constexpr auto kFreeFunc = &GENERAL_NAMES_free;
+};
+
+// This OID is generated via the UUID method.
+static const char* kKuduKerberosPrincipalOidStr = "2.25.243346677289068076843480765133256509912";
+
+string X509NameToString(X509_NAME* name) {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  CHECK(name);
+  auto bio = ssl_make_unique(BIO_new(BIO_s_mem()));
+  OPENSSL_CHECK_OK(X509_NAME_print_ex(bio.get(), name, 0, XN_FLAG_ONELINE));
+
+  BUF_MEM* membuf;
+  OPENSSL_CHECK_OK(BIO_get_mem_ptr(bio.get(), &membuf));
+  return string(membuf->data, membuf->length);
+}
+
+int GetKuduKerberosPrincipalOidNid() {
+  InitializeOpenSSL();
+  static std::once_flag flag;
+  static int nid;
+  std::call_once(flag, [&] () {
+      nid = OBJ_create(kKuduKerberosPrincipalOidStr, "kuduPrinc", "kuduKerberosPrincipal");
+      CHECK_NE(nid, NID_undef) << "failed to create kuduPrinc oid: " << GetOpenSSLErrors();
+  });
+  return nid;
+}
+
+X509* Cert::GetTopOfChainX509() const {
+  CHECK_GT(chain_len(), 0);
+  return sk_X509_value(data_.get(), 0);
+}
+
+Status Cert::FromString(const std::string& data, DataFormat format) {
+  RETURN_NOT_OK(::kudu::security::FromString(data, format, &data_));
+  if (sk_X509_num(data_.get()) < 1) {
+    return Status::RuntimeError("Certificate chain is empty. Expected at least one certificate.");
+  }
+  return Status::OK();
+}
+
+Status Cert::ToString(std::string* data, DataFormat format) const {
+  return ::kudu::security::ToString(data, format, data_.get());
+}
+
+Status Cert::FromFile(const std::string& fpath, DataFormat format) {
+  RETURN_NOT_OK(::kudu::security::FromFile(fpath, format, &data_));
+  if (sk_X509_num(data_.get()) < 1) {
+    return Status::RuntimeError("Certificate chain is empty. Expected at least one certificate.");
+  }
+  return Status::OK();
+}
+
+string Cert::SubjectName() const {
+  return X509NameToString(X509_get_subject_name(GetTopOfChainX509()));
+}
+
+string Cert::IssuerName() const {
+  return X509NameToString(X509_get_issuer_name(GetTopOfChainX509()));
+}
+
+boost::optional<string> Cert::UserId() const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  X509_NAME* name = X509_get_subject_name(GetTopOfChainX509());
+  char buf[1024];
+  int len = X509_NAME_get_text_by_NID(name, NID_userId, buf, arraysize(buf));
+  if (len < 0) return boost::none;
+  return string(buf, len);
+}
+
+vector<string> Cert::Hostnames() const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  vector<string> result;
+  auto gens = ssl_make_unique(reinterpret_cast<GENERAL_NAMES*>(X509_get_ext_d2i(
+      GetTopOfChainX509(), NID_subject_alt_name, nullptr, nullptr)));
+  if (gens) {
+    for (int i = 0; i < sk_GENERAL_NAME_num(gens.get()); ++i) {
+      GENERAL_NAME* gen = sk_GENERAL_NAME_value(gens.get(), i);
+      if (gen->type != GEN_DNS) {
+        continue;
+      }
+      const ASN1_STRING* cstr = gen->d.dNSName;
+      if (cstr->type != V_ASN1_IA5STRING || cstr->data == nullptr) {
+        LOG(DFATAL) << "invalid DNS name in the SAN field";
+        return {};
+      }
+      result.emplace_back(reinterpret_cast<char*>(cstr->data), cstr->length);
+    }
+  }
+  return result;
+}
+
+boost::optional<string> Cert::KuduKerberosPrincipal() const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  int idx = X509_get_ext_by_NID(GetTopOfChainX509(), GetKuduKerberosPrincipalOidNid(), -1);
+  if (idx < 0) return boost::none;
+  X509_EXTENSION* ext = X509_get_ext(GetTopOfChainX509(), idx);
+  ASN1_OCTET_STRING* octet_str = X509_EXTENSION_get_data(ext);
+  const unsigned char* octet_str_data = octet_str->data;
+  long len; // NOLINT
+  int tag, xclass;
+  if (ASN1_get_object(&octet_str_data, &len, &tag, &xclass, octet_str->length) != 0 ||
+      tag != V_ASN1_UTF8STRING) {
+    LOG(DFATAL) << "invalid extension value in cert " << SubjectName();
+    return boost::none;
+  }
+
+  return string(reinterpret_cast<const char*>(octet_str_data), len);
+}
+
+Status Cert::CheckKeyMatch(const PrivateKey& key) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  OPENSSL_RET_NOT_OK(X509_check_private_key(GetTopOfChainX509(), key.GetRawData()),
+                     "certificate does not match private key");
+  return Status::OK();
+}
+
+Status Cert::GetServerEndPointChannelBindings(string* channel_bindings) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  // Find the signature type of the certificate. This corresponds to the digest
+  // (hash) algorithm, and the public key type which signed the cert.
+
+#if OPENSSL_VERSION_NUMBER >= 0x10002000L
+  int signature_nid = X509_get_signature_nid(GetTopOfChainX509());
+#else
+  // Older version of OpenSSL appear not to have a public way to get the
+  // signature digest method from a certificate. Instead, we reach into the
+  // 'private' internals.
+  int signature_nid = OBJ_obj2nid(GetTopOfChainX509()->sig_alg->algorithm);
+#endif
+
+  // Retrieve the digest algorithm type.
+  int digest_nid;
+  int public_key_nid;
+  OBJ_find_sigid_algs(signature_nid, &digest_nid, &public_key_nid);
+
+  // RFC 5929: if the certificate's signatureAlgorithm uses no hash functions or
+  // uses multiple hash functions, then this channel binding type's channel
+  // bindings are undefined at this time (updates to is channel binding type may
+  // occur to address this issue if it ever arises).
+  //
+  // TODO(dan): can the multiple hash function scenario actually happen? What
+  // does OBJ_find_sigid_algs do in that scenario?
+  if (digest_nid == NID_undef) {
+    return Status::NotSupported("server certificate has no signature digest (hash) algorithm");
+  }
+
+  // RFC 5929: if the certificate's signatureAlgorithm uses a single hash
+  // function, and that hash function is either MD5 [RFC1321] or SHA-1
+  // [RFC3174], then use SHA-256 [FIPS-180-3];
+  if (digest_nid == NID_md5 || digest_nid == NID_sha1) {
+    digest_nid = NID_sha256;
+  }
+
+  const EVP_MD* md = EVP_get_digestbynid(digest_nid);
+  OPENSSL_RET_IF_NULL(md, "digest for nid not found");
+
+  // Create a digest BIO. All data written to the BIO will be sent through the
+  // digest (hash) function. The digest BIO requires a null BIO to writethrough to.
+  auto null_bio = ssl_make_unique(BIO_new(BIO_s_null()));
+  auto md_bio = ssl_make_unique(BIO_new(BIO_f_md()));
+  OPENSSL_RET_NOT_OK(BIO_set_md(md_bio.get(), md), "failed to set digest for BIO");
+  BIO_push(md_bio.get(), null_bio.get());
+
+  // Write the cert to the digest BIO.
+  RETURN_NOT_OK(ToBIO(md_bio.get(), DataFormat::DER, data_.get()));
+
+  // Read the digest from the BIO and append it to 'channel_bindings'.
+  char buf[EVP_MAX_MD_SIZE];
+  int digest_len = BIO_gets(md_bio.get(), buf, sizeof(buf));
+  OPENSSL_RET_NOT_OK(digest_len, "failed to get cert digest from BIO");
+  channel_bindings->assign(buf, digest_len);
+  return Status::OK();
+}
+
+void Cert::AdoptAndAddRefRawData(RawDataType* data) {
+  DCHECK_EQ(sk_X509_num(data), 1);
+  X509* cert = sk_X509_value(data, sk_X509_num(data) - 1);
+
+  DCHECK(cert);
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  CHECK_GT(CRYPTO_add(&cert->references, 1, CRYPTO_LOCK_X509), 1) << "X509 use-after-free detected";
+#else
+  OPENSSL_CHECK_OK(X509_up_ref(cert)) << "X509 use-after-free detected: " << GetOpenSSLErrors();
+#endif
+  // We copy the STACK_OF() object, but the copy and the original both internally point to the
+  // same elements.
+  AdoptRawData(sk_X509_dup(data));
+}
+
+void Cert::AdoptX509(X509* cert) {
+  // Free current STACK_OF(X509).
+  sk_X509_pop_free(data_.get(), X509_free);
+  // Allocate new STACK_OF(X509) and populate with 'cert'.
+  STACK_OF(X509)* sk = sk_X509_new_null();
+  DCHECK(sk);
+  sk_X509_push(sk, cert);
+  AdoptRawData(sk);
+}
+
+void Cert::AdoptAndAddRefX509(X509* cert) {
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  CHECK_GT(CRYPTO_add(&cert->references, 1, CRYPTO_LOCK_X509), 1) << "X509 use-after-free detected";
+#else
+  OPENSSL_CHECK_OK(X509_up_ref(cert)) << "X509 use-after-free detected: " << GetOpenSSLErrors();
+#endif
+  AdoptX509(cert);
+}
+
+Status Cert::GetPublicKey(PublicKey* key) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  EVP_PKEY* raw_key = X509_get_pubkey(GetTopOfChainX509());
+  OPENSSL_RET_IF_NULL(raw_key, "unable to get certificate public key");
+  key->AdoptRawData(raw_key);
+  return Status::OK();
+}
+
+Status CertSignRequest::FromString(const std::string& data, DataFormat format) {
+  return ::kudu::security::FromString(data, format, &data_);
+}
+
+Status CertSignRequest::ToString(std::string* data, DataFormat format) const {
+  return ::kudu::security::ToString(data, format, data_.get());
+}
+
+Status CertSignRequest::FromFile(const std::string& fpath, DataFormat format) {
+  return ::kudu::security::FromFile(fpath, format, &data_);
+}
+
+CertSignRequest CertSignRequest::Clone() const {
+  X509_REQ* cloned_req;
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  CHECK_GT(CRYPTO_add(&data_->references, 1, CRYPTO_LOCK_X509_REQ), 1)
+    << "X509_REQ use-after-free detected";
+  cloned_req = GetRawData();
+#else
+  // With OpenSSL 1.1, data structure internals are hidden, and there doesn't
+  // seem to be a public method that increments data_'s refcount.
+  cloned_req = X509_REQ_dup(GetRawData());
+  CHECK(cloned_req != nullptr)
+    << "X509 allocation failure detected: " << GetOpenSSLErrors();
+#endif
+
+  CertSignRequest clone;
+  clone.AdoptRawData(cloned_req);
+  return clone;
+}
+
+Status CertSignRequest::GetPublicKey(PublicKey* key) const {
+  SCOPED_OPENSSL_NO_PENDING_ERRORS;
+  EVP_PKEY* raw_key = X509_REQ_get_pubkey(data_.get());
+  OPENSSL_RET_IF_NULL(raw_key, "unable to get CSR public key");
+  key->AdoptRawData(raw_key);
+  return Status::OK();
+}
+
+} // namespace security
+} // namespace kudu