You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:47 UTC
[35/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/service_queue.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_queue.h b/be/src/kudu/rpc/service_queue.h
new file mode 100644
index 0000000..2751a30
--- /dev/null
+++ b/be/src/kudu/rpc/service_queue.h
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SERVICE_QUEUE_H
+#define KUDU_UTIL_SERVICE_QUEUE_H
+
+#include <memory>
+#include <string>
+#include <set>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+
+namespace boost {
+template <class T>
+class optional;
+}
+
+namespace kudu {
+namespace rpc {
+
+// Return values for ServiceQueue::Put()
+enum QueueStatus {
+ QUEUE_SUCCESS = 0,
+ QUEUE_SHUTDOWN = 1,
+ QUEUE_FULL = 2
+};
+
+// Blocking queue used for passing inbound RPC calls to the service handler pool.
+// Calls are dequeued in 'earliest-deadline first' order. The queue also maintains a
+// bounded number of calls. If the queue overflows, then calls with deadlines farthest
+// in the future are evicted.
+//
+// When calls do not provide deadlines, the RPC layer considers their deadline to
+// be infinitely in the future. This means that any call that does have a deadline
+// can evict any call that does not have a deadline. This incentivizes clients to
+// provide accurate deadlines for their calls.
+//
+// In order to improve concurrent throughput, this class uses a LIFO design:
+// Each consumer thread has its own lock and condition variable. If a
+// consumer arrives and there is no work available in the queue, it will not
+// wait on the queue lock, but rather push its own 'ConsumerState' object
+// to the 'waiting_consumers_' stack. When work arrives, if there are waiting
+// consumers, the top consumer is popped from the stack and woken up.
+//
+// This design has a few advantages over the basic BlockingQueue:
+// - the worker who was most recently busy is the one which will be selected for
+// new work. This gives an opportunity for the worker to be scheduled again
+// without going to sleep, and also keeps CPU cache and allocator caches hot.
+// - in the common case that there are enough workers to fully service the incoming
+// work rate, the queue implementation itself is never used. Thus, we can
+// have a priority queue without paying extra for it in the common case.
+//
+// NOTE: because of the use of thread-local consumer records, once a consumer
+// thread accesses one LifoServiceQueue, it becomes "bound" to that queue and
+// must never access any other instance.
+class LifoServiceQueue {
+ public:
+ explicit LifoServiceQueue(int max_size);
+
+ ~LifoServiceQueue();
+
+ // Get an element from the queue. Returns false if we were shut down prior to
+ // getting the element.
+ bool BlockingGet(std::unique_ptr<InboundCall>* out);
+
+ // Add a new call to the queue.
+ // Returns:
+ // - QUEUE_SHUTDOWN if Shutdown() has already been called.
+ // - QUEUE_FULL if the queue is full and 'call' has a later deadline than any
+ // RPC already in the queue.
+ // - QUEUE_SUCCESS if 'call' was enqueued.
+ //
+ // In the case of a 'QUEUE_SUCCESS' response, the new element may have bumped
+ // another call out of the queue. In that case, *evicted will be set to the
+ // call that was bumped.
+ QueueStatus Put(InboundCall* call, boost::optional<InboundCall*>* evicted);
+
+ // Shut down the queue.
+ // When a blocking queue is shut down, no more elements can be added to it,
+ // and Put() will return QUEUE_SHUTDOWN.
+ // Existing elements will drain out of it, and then BlockingGet will start
+ // returning false.
+ void Shutdown();
+
+ bool empty() const;
+
+ int max_size() const;
+
+ std::string ToString() const;
+
+ // Return an estimate of the current queue length.
+ int estimated_queue_length() const {
+ ANNOTATE_IGNORE_READS_BEGIN();
+ // The C++ standard says that std::multiset::size must be constant time,
+ // so this method won't try to traverse any actual nodes of the underlying
+ // RB tree. Investigation of the libstdcxx implementation confirms that
+ // size() is a simple field access of the _Rb_tree structure.
+ int ret = queue_.size();
+ ANNOTATE_IGNORE_READS_END();
+ return ret;
+ }
+
+ // Return an estimate of the number of idle threads currently awaiting work.
+ int estimated_idle_worker_count() const {
+ ANNOTATE_IGNORE_READS_BEGIN();
+ // Size of a vector is a simple field access so this is safe.
+ int ret = waiting_consumers_.size();
+ ANNOTATE_IGNORE_READS_END();
+ return ret;
+ }
+
+ private:
+ // Comparison function which orders calls by their deadlines.
+ static bool DeadlineLess(const InboundCall* a,
+ const InboundCall* b) {
+ auto time_a = a->GetClientDeadline();
+ auto time_b = b->GetClientDeadline();
+ if (time_a == time_b) {
+ // If two calls have the same deadline (most likely because neither one specified
+ // one) then we should order them by arrival order.
+ time_a = a->GetTimeReceived();
+ time_b = b->GetTimeReceived();
+ }
+ return time_a < time_b;
+ }
+
+ // Struct functor wrapper for DeadlineLess.
+ struct DeadlineLessStruct {
+ bool operator()(const InboundCall* a, const InboundCall* b) const {
+ return DeadlineLess(a, b);
+ }
+ };
+
+ // The thread-local record corresponding to a single consumer thread.
+ // Threads push this record onto the waiting_consumers_ stack when
+ // they are awaiting work. Producers pop the top waiting consumer and
+ // post work using Post().
+ class ConsumerState {
+ public:
+ explicit ConsumerState(LifoServiceQueue* queue) :
+ cond_(&lock_),
+ call_(nullptr),
+ should_wake_(false),
+ bound_queue_(queue) {
+ }
+
+ void Post(InboundCall* call) {
+ DCHECK(call_ == nullptr);
+ MutexLock l(lock_);
+ call_ = call;
+ should_wake_ = true;
+ cond_.Signal();
+ }
+
+ InboundCall* Wait() {
+ MutexLock l(lock_);
+ while (should_wake_ == false) {
+ cond_.Wait();
+ }
+ should_wake_ = false;
+ InboundCall* ret = call_;
+ call_ = nullptr;
+ return ret;
+ }
+
+ void DCheckBoundInstance(LifoServiceQueue* q) {
+ DCHECK_EQ(q, bound_queue_);
+ }
+
+ private:
+ Mutex lock_;
+ ConditionVariable cond_;
+ InboundCall* call_;
+ bool should_wake_;
+
+ // For the purpose of assertions, tracks the LifoServiceQueue instance that
+ // this consumer is reading from.
+ LifoServiceQueue* bound_queue_;
+ };
+
+ static __thread ConsumerState* tl_consumer_;
+
+ mutable simple_spinlock lock_;
+ bool shutdown_;
+ int max_queue_size_;
+
+ // Stack of consumer threads which are currently waiting for work.
+ std::vector<ConsumerState*> waiting_consumers_;
+
+ // The actual queue. Work is only added to the queue when there were no
+ // consumers available for a "direct hand-off".
+ std::multiset<InboundCall*, DeadlineLessStruct> queue_;
+
+ // The total set of consumers who have ever accessed this queue.
+ std::vector<std::unique_ptr<ConsumerState>> consumers_;
+
+ DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc
new file mode 100644
index 0000000..bdf5191
--- /dev/null
+++ b/be/src/kudu/rpc/transfer.cc
@@ -0,0 +1,283 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/transfer.h"
+
+#include <sys/uio.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <iostream>
+#include <limits>
+#include <set>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/net/socket.h"
+
+DEFINE_int64(rpc_max_message_size, (50 * 1024 * 1024),
+ "The maximum size of a message that any RPC that the server will accept. "
+ "Must be at least 1MB.");
+TAG_FLAG(rpc_max_message_size, advanced);
+TAG_FLAG(rpc_max_message_size, runtime);
+
+static bool ValidateMaxMessageSize(const char* flagname, int64_t value) {
+ if (value < 1 * 1024 * 1024) {
+ LOG(ERROR) << flagname << " must be at least 1MB.";
+ return false;
+ }
+ if (value > std::numeric_limits<int32_t>::max()) {
+ LOG(ERROR) << flagname << " must be less than "
+ << std::numeric_limits<int32_t>::max() << " bytes.";
+ }
+
+ return true;
+}
+static bool dummy = google::RegisterFlagValidator(
+ &FLAGS_rpc_max_message_size, &ValidateMaxMessageSize);
+
+namespace kudu {
+namespace rpc {
+
+using std::ostringstream;
+using std::set;
+using std::string;
+using strings::Substitute;
+
+#define RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status) \
+ do { \
+ Status _s = (status); \
+ if (PREDICT_FALSE(!_s.ok())) { \
+ if (Socket::IsTemporarySocketError(_s.posix_code())) { \
+ return Status::OK(); /* EAGAIN, etc. */ \
+ } \
+ return _s; \
+ } \
+ } while (0)
+
+TransferCallbacks::~TransferCallbacks()
+{}
+
+InboundTransfer::InboundTransfer()
+ : total_length_(kMsgLengthPrefixLength),
+ cur_offset_(0) {
+ buf_.resize(kMsgLengthPrefixLength);
+}
+
+Status InboundTransfer::ReceiveBuffer(Socket &socket) {
+ if (cur_offset_ < kMsgLengthPrefixLength) {
+ // receive uint32 length prefix
+ int32_t rem = kMsgLengthPrefixLength - cur_offset_;
+ int32_t nread;
+ Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+ RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+ if (nread == 0) {
+ return Status::OK();
+ }
+ DCHECK_GE(nread, 0);
+ cur_offset_ += nread;
+ if (cur_offset_ < kMsgLengthPrefixLength) {
+ // If we still don't have the full length prefix, we can't continue
+ // reading yet.
+ return Status::OK();
+ }
+ // Since we only read 'rem' bytes above, we should now have exactly
+ // the length prefix in our buffer and no more.
+ DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength);
+
+ // The length prefix doesn't include its own 4 bytes, so we have to
+ // add that back in.
+ total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength;
+ if (total_length_ > FLAGS_rpc_max_message_size) {
+ return Status::NetworkError(Substitute(
+ "RPC frame had a length of $0, but we only support messages up to $1 bytes "
+ "long.", total_length_, FLAGS_rpc_max_message_size));
+ }
+ if (total_length_ <= kMsgLengthPrefixLength) {
+ return Status::NetworkError(Substitute("RPC frame had invalid length of $0",
+ total_length_));
+ }
+ buf_.resize(total_length_);
+
+ // Fall through to receive the message body, which is likely to be already
+ // available on the socket.
+ }
+
+ // receive message body
+ int32_t nread;
+
+ // Socket::Recv() handles at most INT_MAX at a time, so cap the remainder at
+ // INT_MAX. The message will be split across multiple Recv() calls.
+ // Note that this is only needed when rpc_max_message_size > INT_MAX, which is
+ // currently only used for unit tests.
+ int32_t rem = std::min(total_length_ - cur_offset_,
+ static_cast<uint32_t>(std::numeric_limits<int32_t>::max()));
+ Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+ RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+ cur_offset_ += nread;
+
+ return Status::OK();
+}
+
+bool InboundTransfer::TransferStarted() const {
+ return cur_offset_ != 0;
+}
+
+bool InboundTransfer::TransferFinished() const {
+ return cur_offset_ == total_length_;
+}
+
+string InboundTransfer::StatusAsString() const {
+ return Substitute("$0/$1 bytes received", cur_offset_, total_length_);
+}
+
+OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id,
+ const TransferPayload &payload,
+ size_t n_payload_slices,
+ TransferCallbacks *callbacks) {
+ return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks);
+}
+
+OutboundTransfer* OutboundTransfer::CreateForCallResponse(const TransferPayload &payload,
+ size_t n_payload_slices,
+ TransferCallbacks *callbacks) {
+ return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices, callbacks);
+}
+
+OutboundTransfer::OutboundTransfer(int32_t call_id,
+ const TransferPayload &payload,
+ size_t n_payload_slices,
+ TransferCallbacks *callbacks)
+ : cur_slice_idx_(0),
+ cur_offset_in_slice_(0),
+ callbacks_(callbacks),
+ call_id_(call_id),
+ started_(false),
+ aborted_(false) {
+
+ n_payload_slices_ = n_payload_slices;
+ CHECK_LE(n_payload_slices_, payload_slices_.size());
+ for (int i = 0; i < n_payload_slices; i++) {
+ payload_slices_[i] = payload[i];
+ }
+}
+
+OutboundTransfer::~OutboundTransfer() {
+ if (!TransferFinished() && !aborted_) {
+ callbacks_->NotifyTransferAborted(
+ Status::RuntimeError("RPC transfer destroyed before it finished sending"));
+ }
+}
+
+void OutboundTransfer::Abort(const Status &status) {
+ CHECK(!aborted_) << "Already aborted";
+ CHECK(!TransferFinished()) << "Cannot abort a finished transfer";
+ callbacks_->NotifyTransferAborted(status);
+ aborted_ = true;
+}
+
+Status OutboundTransfer::SendBuffer(Socket &socket) {
+ CHECK_LT(cur_slice_idx_, n_payload_slices_);
+
+ started_ = true;
+ int n_iovecs = n_payload_slices_ - cur_slice_idx_;
+ struct iovec iovec[n_iovecs];
+ {
+ int offset_in_slice = cur_offset_in_slice_;
+ for (int i = 0; i < n_iovecs; i++) {
+ Slice &slice = payload_slices_[cur_slice_idx_ + i];
+ iovec[i].iov_base = slice.mutable_data() + offset_in_slice;
+ iovec[i].iov_len = slice.size() - offset_in_slice;
+
+ offset_in_slice = 0;
+ }
+ }
+
+ int64_t written;
+ Status status = socket.Writev(iovec, n_iovecs, &written);
+ RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+
+ // Adjust our accounting of current writer position.
+ for (int i = cur_slice_idx_; i < n_payload_slices_; i++) {
+ Slice &slice = payload_slices_[i];
+ int rem_in_slice = slice.size() - cur_offset_in_slice_;
+ DCHECK_GE(rem_in_slice, 0);
+
+ if (written >= rem_in_slice) {
+ // Used up this entire slice, advance to the next slice.
+ cur_slice_idx_++;
+ cur_offset_in_slice_ = 0;
+ written -= rem_in_slice;
+ } else {
+ // Partially used up this slice, just advance the offset within it.
+ cur_offset_in_slice_ += written;
+ break;
+ }
+ }
+
+ if (cur_slice_idx_ == n_payload_slices_) {
+ callbacks_->NotifyTransferFinished();
+ DCHECK_EQ(0, cur_offset_in_slice_);
+ } else {
+ DCHECK_LT(cur_slice_idx_, n_payload_slices_);
+ DCHECK_LT(cur_offset_in_slice_, payload_slices_[cur_slice_idx_].size());
+ }
+
+ return Status::OK();
+}
+
+bool OutboundTransfer::TransferStarted() const {
+ return started_;
+}
+
+bool OutboundTransfer::TransferFinished() const {
+ if (cur_slice_idx_ == n_payload_slices_) {
+ DCHECK_EQ(0, cur_offset_in_slice_); // sanity check
+ return true;
+ }
+ return false;
+}
+
+string OutboundTransfer::HexDump() const {
+ if (KUDU_SHOULD_REDACT()) {
+ return kRedactionMessage;
+ }
+
+ string ret;
+ for (int i = 0; i < n_payload_slices_; i++) {
+ ret.append(payload_slices_[i].ToDebugString());
+ }
+ return ret;
+}
+
+int32_t OutboundTransfer::TotalLength() const {
+ int32_t ret = 0;
+ for (int i = 0; i < n_payload_slices_; i++) {
+ ret += payload_slices_[i].size();
+ }
+ return ret;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h
new file mode 100644
index 0000000..b95d43d
--- /dev/null
+++ b/be/src/kudu/rpc/transfer.h
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_TRANSFER_H
+#define KUDU_RPC_TRANSFER_H
+
+#include <array>
+#include <cstddef>
+#include <cstdint>
+#include <limits.h>
+#include <string>
+
+#include <boost/intrusive/list_hook.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+DECLARE_int64(rpc_max_message_size);
+
+namespace kudu {
+
+class Socket;
+
+namespace rpc {
+
+struct TransferCallbacks;
+
+class TransferLimits {
+ public:
+ enum {
+ kMaxSidecars = 10,
+ kMaxPayloadSlices = kMaxSidecars + 2, // (header + msg)
+ kMaxTotalSidecarBytes = INT_MAX
+ };
+
+ DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
+};
+
+typedef std::array<Slice, TransferLimits::kMaxPayloadSlices> TransferPayload;
+
+// This class is used internally by the RPC layer to represent an inbound
+// transfer in progress.
+//
+// Inbound Transfer objects are created by a Connection receiving data. When the
+// message is fully received, it is either parsed as a call, or a call response,
+// and the InboundTransfer object itself is handed off.
+class InboundTransfer {
+ public:
+
+ InboundTransfer();
+
+ // read from the socket into our buffer
+ Status ReceiveBuffer(Socket &socket);
+
+ // Return true if any bytes have yet been sent.
+ bool TransferStarted() const;
+
+ // Return true if the entire transfer has been sent.
+ bool TransferFinished() const;
+
+ Slice data() const {
+ return Slice(buf_);
+ }
+
+ // Return a string indicating the status of this transfer (number of bytes received, etc)
+ // suitable for logging.
+ std::string StatusAsString() const;
+
+ private:
+
+ Status ProcessInboundHeader();
+
+ faststring buf_;
+
+ uint32_t total_length_;
+ uint32_t cur_offset_;
+
+ DISALLOW_COPY_AND_ASSIGN(InboundTransfer);
+};
+
+// When the connection wants to send data, it creates an OutboundTransfer object
+// to encompass it. This sits on a queue within the Connection, so that each time
+// the Connection wakes up with a writable socket, it consumes more bytes off
+// the next pending transfer in the queue.
+//
+// Upon completion of the transfer, a callback is triggered.
+class OutboundTransfer : public boost::intrusive::list_base_hook<> {
+ public:
+ // Factory methods for creating transfers associated with call requests
+ // or responses. The 'payload' slices will be concatenated and
+ // written to the socket. When the transfer completes or errors, the
+ // appropriate method of 'callbacks' is invoked.
+ //
+ // Does not take ownership of the callbacks object or the underlying
+ // memory of the slices. The slices must remain valid until the callback
+ // is triggered.
+ //
+ // NOTE: 'payload' is currently restricted to a maximum of kMaxPayloadSlices
+ // slices.
+ // ------------------------------------------------------------
+
+ // Create an outbound transfer for a call request.
+ static OutboundTransfer* CreateForCallRequest(int32_t call_id,
+ const TransferPayload &payload,
+ size_t n_payload_slices,
+ TransferCallbacks *callbacks);
+
+ // Create an outbound transfer for a call response.
+ // See above for details.
+ static OutboundTransfer* CreateForCallResponse(const TransferPayload &payload,
+ size_t n_payload_slices,
+ TransferCallbacks *callbacks);
+
+ // Destruct the transfer. A transfer object should never be deallocated
+ // before it has either (a) finished transferring, or (b) been Abort()ed.
+ ~OutboundTransfer();
+
+ // Abort the current transfer, with the given status.
+ // This triggers TransferCallbacks::NotifyTransferAborted.
+ void Abort(const Status &status);
+
+ // send from our buffers into the sock
+ Status SendBuffer(Socket &socket);
+
+ // Return true if any bytes have yet been sent.
+ bool TransferStarted() const;
+
+ // Return true if the entire transfer has been sent.
+ bool TransferFinished() const;
+
+ // Return the total number of bytes to be sent (including those already sent)
+ int32_t TotalLength() const;
+
+ std::string HexDump() const;
+
+ bool is_for_outbound_call() const {
+ return call_id_ != kInvalidCallId;
+ }
+
+ // Returns the call ID for a transfer associated with an outbound
+ // call. Must not be called for call responses.
+ int32_t call_id() const {
+ DCHECK_NE(call_id_, kInvalidCallId);
+ return call_id_;
+ }
+
+ private:
+ OutboundTransfer(int32_t call_id,
+ const TransferPayload& payload,
+ size_t n_payload_slices,
+ TransferCallbacks *callbacks);
+
+ // Slices to send. Uses an array here instead of a vector to avoid an expensive
+ // vector construction (improved performance a couple percent).
+ TransferPayload payload_slices_;
+ size_t n_payload_slices_;
+
+ // The current slice that is being sent.
+ int32_t cur_slice_idx_;
+ // The number of bytes in the above slice which has already been sent.
+ int32_t cur_offset_in_slice_;
+
+ TransferCallbacks *callbacks_;
+
+ // In the case of outbound calls, the associated call ID.
+ // In the case of call responses, kInvalidCallId
+ int32_t call_id_;
+
+ // True if SendBuffer() has been called at least once. This can be true even if
+ // no bytes were sent successfully. This is needed as SSL_write() is stateful.
+ // Please see KUDU-2334 for details.
+ bool started_;
+
+ bool aborted_;
+
+ DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);
+};
+
+// Callbacks made after a transfer completes.
+struct TransferCallbacks {
+ public:
+ virtual ~TransferCallbacks();
+
+ // The transfer finished successfully.
+ virtual void NotifyTransferFinished() = 0;
+
+ // The transfer was aborted (e.g because the connection died or an error occurred).
+ virtual void NotifyTransferAborted(const Status &status) = 0;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/user_credentials.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.cc b/be/src/kudu/rpc/user_credentials.cc
new file mode 100644
index 0000000..7f318fe
--- /dev/null
+++ b/be/src/kudu/rpc/user_credentials.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/user_credentials.h"
+
+#include <cstddef>
+#include <string>
+#include <utility>
+
+#include <boost/functional/hash/hash.hpp>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+#include "kudu/util/user.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+bool UserCredentials::has_real_user() const {
+ return !real_user_.empty();
+}
+
+void UserCredentials::set_real_user(string real_user) {
+ real_user_ = std::move(real_user);
+}
+
+Status UserCredentials::SetLoggedInRealUser() {
+ return GetLoggedInUser(&real_user_);
+}
+
+string UserCredentials::ToString() const {
+ return strings::Substitute("{real_user=$0}", real_user_);
+}
+
+size_t UserCredentials::HashCode() const {
+ size_t seed = 0;
+ if (has_real_user()) {
+ boost::hash_combine(seed, real_user());
+ }
+ return seed;
+}
+
+bool UserCredentials::Equals(const UserCredentials& other) const {
+ return real_user() == other.real_user();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/user_credentials.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.h b/be/src/kudu/rpc/user_credentials.h
new file mode 100644
index 0000000..5a0434c
--- /dev/null
+++ b/be/src/kudu/rpc/user_credentials.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstddef>
+#include <string>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+// Client-side user credentials. Currently this is more-or-less a simple wrapper
+// around a username string. However, we anticipate moving more credentials such as
+// tokens into a per-Proxy structure rather than Messenger-wide, and this will
+// be the place to store them.
+class UserCredentials {
+ public:
+ // Real user.
+ bool has_real_user() const;
+ void set_real_user(std::string real_user);
+ const std::string& real_user() const { return real_user_; }
+
+ // Sets the real user to the currently logged in user.
+ Status SetLoggedInRealUser();
+
+ // Returns a string representation of the object.
+ std::string ToString() const;
+
+ std::size_t HashCode() const;
+ bool Equals(const UserCredentials& other) const;
+
+ private:
+ // Remember to update HashCode() and Equals() when new fields are added.
+ std::string real_user_;
+};
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/CMakeLists.txt b/be/src/kudu/security/CMakeLists.txt
new file mode 100644
index 0000000..b79486e
--- /dev/null
+++ b/be/src/kudu/security/CMakeLists.txt
@@ -0,0 +1,141 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# See the comment in krb5_realm_override.cc for details on this library's usage.
+# The top-level CMakeLists sets a ${KRB5_REALM_OVERRIDE} variable which should
+# be linked first into all Kudu binaries.
+
+##############################
+# krb5_realm_override
+##############################
+
+add_library(krb5_realm_override STATIC krb5_realm_override.cc)
+target_link_libraries(krb5_realm_override glog)
+if(NOT APPLE)
+ target_link_libraries(krb5_realm_override dl)
+endif()
+
+##############################
+# token_proto
+##############################
+
+PROTOBUF_GENERATE_CPP(
+ TOKEN_PROTO_SRCS TOKEN_PROTO_HDRS TOKEN_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES token.proto)
+set(TOKEN_PROTO_LIBS protobuf pb_util_proto)
+ADD_EXPORTABLE_LIBRARY(token_proto
+ SRCS ${TOKEN_PROTO_SRCS}
+ DEPS ${TOKEN_PROTO_LIBS}
+ NONLINK_DEPS ${TOKEN_PROTO_TGTS})
+
+
+##############################
+# security
+##############################
+
+# Check for krb5_get_init_creds_opt_set_out_ccache, which is not available in versions
+# of MIT Kerberos older than krb5-1.6, and is also not present in Heimdal kerberos.
+include(CheckLibraryExists)
+check_library_exists("krb5" krb5_get_init_creds_opt_set_out_ccache
+ ${KERBEROS_LIBRARY} HAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE)
+if(HAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE)
+ add_definitions(-DHAVE_KRB5_GET_INIT_CREDS_OPT_SET_OUT_CCACHE=1)
+endif()
+
+# Fall back to using the ported functionality if we're using an older version of OpenSSL.
+if (${OPENSSL_VERSION} VERSION_LESS "1.0.2")
+ set(PORTED_X509_CHECK_HOST_CC "x509_check_host.cc")
+endif()
+
+set(SECURITY_SRCS
+ ca/cert_management.cc
+ cert.cc
+ crypto.cc
+ kerberos_util.cc
+ init.cc
+ openssl_util.cc
+ ${PORTED_X509_CHECK_HOST_CC}
+ security_flags.cc
+ simple_acl.cc
+ tls_context.cc
+ tls_handshake.cc
+ tls_socket.cc
+ token_verifier.cc
+ token_signer.cc
+ token_signing_key.cc
+ )
+
+set(SECURITY_LIBS
+ gutil
+ kudu_util
+ token_proto
+
+ krb5
+ openssl_crypto
+ openssl_ssl)
+
+ADD_EXPORTABLE_LIBRARY(security
+ SRCS ${SECURITY_SRCS}
+ DEPS ${SECURITY_LIBS})
+
+
+##############################
+# mini_kdc
+##############################
+
+set(MINI_KDC_SRCS test/mini_kdc.cc)
+
+add_library(mini_kdc ${MINI_KDC_SRCS})
+target_link_libraries(mini_kdc
+ gutil
+ kudu_test_util
+ kudu_util)
+
+##############################
+# security_test_util
+##############################
+
+if (NOT NO_TESTS)
+ set(SECURITY_TEST_SRCS
+ security-test-util.cc
+ test/test_certs.cc
+ test/test_pass.cc)
+
+ add_library(security_test_util ${SECURITY_TEST_SRCS})
+ target_link_libraries(security_test_util
+ gutil
+ kudu_test_util
+ kudu_util
+ security)
+
+ # Tests
+ set(KUDU_TEST_LINK_LIBS
+ mini_kdc
+ security
+ security_test_util
+ ${KUDU_MIN_TEST_LIBS})
+
+ ADD_KUDU_TEST(ca/cert_management-test)
+ ADD_KUDU_TEST(cert-test)
+ ADD_KUDU_TEST(crypto-test)
+ ADD_KUDU_TEST(test/mini_kdc-test)
+ ADD_KUDU_TEST(tls_handshake-test)
+ ADD_KUDU_TEST(tls_socket-test PROCESSORS 2)
+ ADD_KUDU_TEST(token-test)
+endif()
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/ca/cert_management-test.cc b/be/src/kudu/security/ca/cert_management-test.cc
new file mode 100644
index 0000000..0c8abc8
--- /dev/null
+++ b/be/src/kudu/security/ca/cert_management-test.cc
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/ca/cert_management.h"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/security-test-util.h"
+#include "kudu/security/test/test_certs.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace security {
+namespace ca {
+
+class CertManagementTest : public KuduTest {
+ public:
+ void SetUp() override {
+ ASSERT_OK(ca_cert_.FromString(kCaCert, DataFormat::PEM));
+ ASSERT_OK(ca_private_key_.FromString(kCaPrivateKey, DataFormat::PEM));
+ ASSERT_OK(ca_public_key_.FromString(kCaPublicKey, DataFormat::PEM));
+ ASSERT_OK(ca_exp_cert_.FromString(kCaExpiredCert, DataFormat::PEM));
+ ASSERT_OK(ca_exp_private_key_.FromString(kCaExpiredPrivateKey, DataFormat::PEM));
+ // Sanity checks.
+ ASSERT_OK(ca_cert_.CheckKeyMatch(ca_private_key_));
+ ASSERT_OK(ca_exp_cert_.CheckKeyMatch(ca_exp_private_key_));
+ }
+
+ protected:
+ CertRequestGenerator::Config PrepareConfig(
+ const string& hostname = "localhost.localdomain") {
+ return { hostname };
+ }
+
+ CaCertRequestGenerator::Config PrepareCaConfig(const string& cn) {
+ return { cn };
+ }
+
+ // Create a new private key in 'key' and return a CSR associated with that
+ // key.
+ template<class CSRGen = CertRequestGenerator>
+ CertSignRequest PrepareTestCSR(typename CSRGen::Config config,
+ PrivateKey* key) {
+ CHECK_OK(GeneratePrivateKey(512, key));
+ CSRGen gen(std::move(config));
+ CHECK_OK(gen.Init());
+ CertSignRequest req;
+ CHECK_OK(gen.GenerateRequest(*key, &req));
+ return req;
+ }
+
+ Cert ca_cert_;
+ PrivateKey ca_private_key_;
+ PublicKey ca_public_key_;
+
+ Cert ca_exp_cert_;
+ PrivateKey ca_exp_private_key_;
+};
+
+// Check for basic constraints while initializing CertRequestGenerator objects.
+TEST_F(CertManagementTest, RequestGeneratorConstraints) {
+ const CertRequestGenerator::Config gen_config = PrepareConfig("");
+ CertRequestGenerator gen(gen_config);
+ const Status s = gen.Init();
+ const string err_msg = s.ToString();
+ ASSERT_TRUE(s.IsInvalidArgument()) << err_msg;
+ ASSERT_STR_CONTAINS(err_msg, "hostname must not be empty");
+}
+
+// Check for the basic functionality of the CertRequestGenerator class:
+// check it's able to generate keys of expected number of bits and that it
+// reports an error if trying to generate a key of unsupported number of bits.
+TEST_F(CertManagementTest, RequestGeneratorBasics) {
+ const CertRequestGenerator::Config gen_config = PrepareConfig();
+
+ PrivateKey key;
+ ASSERT_OK(GeneratePrivateKey(1024, &key));
+ CertRequestGenerator gen(gen_config);
+ ASSERT_OK(gen.Init());
+ string key_str;
+ ASSERT_OK(key.ToString(&key_str, DataFormat::PEM));
+ // Check for non-supported number of bits for the key.
+ Status s = GeneratePrivateKey(7, &key);
+ ASSERT_TRUE(s.IsRuntimeError());
+}
+
+// Check that CertSigner behaves in a predictable way if given non-matching
+// CA private key and certificate.
+TEST_F(CertManagementTest, SignerInitWithMismatchedCertAndKey) {
+ PrivateKey key;
+ const auto& csr = PrepareTestCSR(PrepareConfig(), &key);
+ {
+ Cert cert;
+ Status s = CertSigner(&ca_cert_, &ca_exp_private_key_)
+ .Sign(csr, &cert);
+
+ const string err_msg = s.ToString();
+ ASSERT_TRUE(s.IsRuntimeError()) << err_msg;
+ ASSERT_STR_CONTAINS(err_msg, "certificate does not match private key");
+ }
+ {
+ Cert cert;
+ Status s = CertSigner(&ca_exp_cert_, &ca_private_key_)
+ .Sign(csr, &cert);
+ const string err_msg = s.ToString();
+ ASSERT_TRUE(s.IsRuntimeError()) << err_msg;
+ ASSERT_STR_CONTAINS(err_msg, "certificate does not match private key");
+ }
+}
+
+// Check how CertSigner behaves if given expired CA certificate
+// and corresponding private key.
+TEST_F(CertManagementTest, SignerInitWithExpiredCert) {
+ const CertRequestGenerator::Config gen_config = PrepareConfig();
+ PrivateKey key;
+ CertSignRequest req = PrepareTestCSR(gen_config, &key);
+
+ // Signer works fine even with expired CA certificate.
+ Cert cert;
+ ASSERT_OK(CertSigner(&ca_exp_cert_, &ca_exp_private_key_).Sign(req, &cert));
+ ASSERT_OK(cert.CheckKeyMatch(key));
+}
+
+// Generate X509 CSR and issue corresponding certificate putting the specified
+// hostname into the SAN X509v3 extension field. The fix for KUDU-1981 addresses
+// the issue of enabling Kudu server components on systems with FQDN longer than
+// 64 characters. This test is a regression for KUDU-1981, so let's verify that
+// CSRs and the result X509 cerificates with long hostnames in SAN are handled
+// properly.
+TEST_F(CertManagementTest, SignCertLongHostnameInSan) {
+ for (auto const& hostname :
+ {
+ "foo.bar.com",
+
+ "222222222222222222222222222222222222222222222222222222222222222."
+ "555555555555555555555555555555555555555555555555555555555555555."
+ "555555555555555555555555555555555555555555555555555555555555555."
+ "chaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaars",
+ }) {
+ CertRequestGenerator::Config gen_config;
+ gen_config.hostname = hostname;
+ gen_config.user_id = "test-uid";
+ PrivateKey key;
+ const auto& csr = PrepareTestCSR(gen_config, &key);
+ Cert cert;
+ ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert));
+ ASSERT_OK(cert.CheckKeyMatch(key));
+
+ EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = my@email.com",
+ cert.IssuerName());
+ EXPECT_EQ("UID = test-uid", cert.SubjectName());
+ vector<string> hostnames = cert.Hostnames();
+ ASSERT_EQ(1, hostnames.size());
+ EXPECT_EQ(hostname, hostnames[0]);
+ }
+}
+
+// Generate X509 CSR and issues corresponding certificate.
+TEST_F(CertManagementTest, SignCert) {
+ CertRequestGenerator::Config gen_config;
+ gen_config.hostname = "foo.bar.com";
+ gen_config.user_id = "test-uid";
+ gen_config.kerberos_principal = "kudu/foo.bar.com@bar.com";
+ PrivateKey key;
+ const auto& csr = PrepareTestCSR(gen_config, &key);
+ Cert cert;
+ ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert));
+ ASSERT_OK(cert.CheckKeyMatch(key));
+
+ EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = my@email.com",
+ cert.IssuerName());
+ EXPECT_EQ("UID = test-uid", cert.SubjectName());
+ EXPECT_EQ(gen_config.user_id, *cert.UserId());
+ EXPECT_EQ(gen_config.kerberos_principal, *cert.KuduKerberosPrincipal());
+ vector<string> hostnames = cert.Hostnames();
+ ASSERT_EQ(1, hostnames.size());
+ EXPECT_EQ("foo.bar.com", hostnames[0]);
+}
+
+// Generate X509 CA CSR and sign the result certificate.
+TEST_F(CertManagementTest, SignCaCert) {
+ const CaCertRequestGenerator::Config gen_config(PrepareCaConfig("self-ca"));
+ PrivateKey key;
+ const auto& csr = PrepareTestCSR<CaCertRequestGenerator>(gen_config, &key);
+ Cert cert;
+ ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_).Sign(csr, &cert));
+ ASSERT_OK(cert.CheckKeyMatch(key));
+}
+
+// Test the creation and use of a CA which uses a self-signed CA cert
+// generated on the fly.
+TEST_F(CertManagementTest, TestSelfSignedCA) {
+ PrivateKey ca_key;
+ Cert ca_cert;
+ ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+ // Create a key and CSR for the tablet server.
+ const auto& config = PrepareConfig();
+ PrivateKey ts_key;
+ CertSignRequest ts_csr = PrepareTestCSR(config, &ts_key);
+
+ // Sign it using the self-signed CA.
+ Cert ts_cert;
+ ASSERT_OK(CertSigner(&ca_cert, &ca_key).Sign(ts_csr, &ts_cert));
+ ASSERT_OK(ts_cert.CheckKeyMatch(ts_key));
+}
+
+// Check the transformation chains for X509 CSRs:
+// internal -> PEM -> internal -> PEM
+// internal -> DER -> internal -> DER
+TEST_F(CertManagementTest, X509CsrFromAndToString) {
+ static const DataFormat kFormats[] = { DataFormat::PEM, DataFormat::DER };
+
+ PrivateKey key;
+ ASSERT_OK(GeneratePrivateKey(1024, &key));
+ CertRequestGenerator gen(PrepareConfig());
+ ASSERT_OK(gen.Init());
+ CertSignRequest req_ref;
+ ASSERT_OK(gen.GenerateRequest(key, &req_ref));
+
+ for (auto format : kFormats) {
+ SCOPED_TRACE(Substitute("X509 CSR format: $0", DataFormatToString(format)));
+ string str_req_ref;
+ ASSERT_OK(req_ref.ToString(&str_req_ref, format));
+ CertSignRequest req;
+ ASSERT_OK(req.FromString(str_req_ref, format));
+ string str_req;
+ ASSERT_OK(req.ToString(&str_req, format));
+ ASSERT_EQ(str_req_ref, str_req);
+ }
+}
+
+// Check the transformation chains for X509 certs:
+// internal -> PEM -> internal -> PEM
+// internal -> DER -> internal -> DER
+TEST_F(CertManagementTest, X509FromAndToString) {
+ static const DataFormat kFormats[] = { DataFormat::PEM, DataFormat::DER };
+
+ PrivateKey key;
+ ASSERT_OK(GeneratePrivateKey(1024, &key));
+ CertRequestGenerator gen(PrepareConfig());
+ ASSERT_OK(gen.Init());
+ CertSignRequest req;
+ ASSERT_OK(gen.GenerateRequest(key, &req));
+
+ Cert cert_ref;
+ ASSERT_OK(CertSigner(&ca_cert_, &ca_private_key_)
+ .Sign(req, &cert_ref));
+
+ for (auto format : kFormats) {
+ SCOPED_TRACE(Substitute("X509 format: $0", DataFormatToString(format)));
+ string str_cert_ref;
+ ASSERT_OK(cert_ref.ToString(&str_cert_ref, format));
+ Cert cert;
+ ASSERT_OK(cert.FromString(str_cert_ref, format));
+ string str_cert;
+ ASSERT_OK(cert.ToString(&str_cert, format));
+ ASSERT_EQ(str_cert_ref, str_cert);
+ }
+}
+
+} // namespace ca
+} // namespace security
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/ca/cert_management.cc b/be/src/kudu/security/ca/cert_management.cc
new file mode 100644
index 0000000..7ccc376
--- /dev/null
+++ b/be/src/kudu/security/ca/cert_management.cc
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/ca/cert_management.h"
+
+#include <algorithm>
+#include <cstdio>
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include <glog/logging.h>
+#include <openssl/conf.h>
+#ifndef OPENSSL_NO_ENGINE
+#include <openssl/engine.h>
+#endif
+#include <openssl/pem.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+
+using std::lock_guard;
+using std::move;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+namespace security {
+
+template<> struct SslTypeTraits<ASN1_INTEGER> {
+ static constexpr auto kFreeFunc = &ASN1_INTEGER_free;
+};
+template<> struct SslTypeTraits<BIGNUM> {
+ static constexpr auto kFreeFunc = &BN_free;
+};
+
+namespace ca {
+
+namespace {
+
+Status SetSubjectNameField(X509_NAME* name,
+ const char* field_code,
+ const string& field_value) {
+ CHECK(name);
+ CHECK(field_code);
+ OPENSSL_RET_NOT_OK(X509_NAME_add_entry_by_txt(
+ name, field_code, MBSTRING_ASC,
+ reinterpret_cast<const unsigned char*>(field_value.c_str()), -1, -1, 0),
+ Substitute("error setting subject field $0", field_code));
+ return Status::OK();
+}
+
+} // anonymous namespace
+
+CertRequestGenerator::~CertRequestGenerator() {
+ sk_X509_EXTENSION_pop_free(extensions_, X509_EXTENSION_free);
+}
+
+Status CertRequestGeneratorBase::GenerateRequest(const PrivateKey& key,
+ CertSignRequest* ret) const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ CHECK(ret);
+ CHECK(Initialized());
+ auto req = ssl_make_unique(X509_REQ_new());
+ OPENSSL_RET_NOT_OK(X509_REQ_set_pubkey(req.get(), key.GetRawData()),
+ "error setting X509 public key");
+
+ // Populate the subject field of the request.
+ RETURN_NOT_OK(SetSubject(req.get()));
+
+ // Set necessary extensions into the request.
+ RETURN_NOT_OK(SetExtensions(req.get()));
+
+ // And finally sign the result.
+ OPENSSL_RET_NOT_OK(X509_REQ_sign(req.get(), key.GetRawData(), EVP_sha256()),
+ "error signing X509 request");
+ ret->AdoptRawData(req.release());
+
+ return Status::OK();
+}
+
+Status CertRequestGeneratorBase::PushExtension(stack_st_X509_EXTENSION* st,
+ int32_t nid, StringPiece value) {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ auto ex = ssl_make_unique(
+ X509V3_EXT_conf_nid(nullptr, nullptr, nid, const_cast<char*>(value.data())));
+ OPENSSL_RET_IF_NULL(ex, "error configuring extension");
+ OPENSSL_RET_NOT_OK(sk_X509_EXTENSION_push(st, ex.release()),
+ "error pushing extension into the stack");
+ return Status::OK();
+}
+
+CertRequestGenerator::CertRequestGenerator(Config config)
+ : CertRequestGeneratorBase(),
+ config_(std::move(config)) {
+}
+
+Status CertRequestGenerator::Init() {
+ InitializeOpenSSL();
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+
+ CHECK(!is_initialized_);
+
+ // Build the SAN field using the specified hostname. In general, it might be
+ // multiple DNS hostnames in the field, but in our use-cases it's always one.
+ if (config_.hostname.empty()) {
+ return Status::InvalidArgument("hostname must not be empty");
+ }
+ const string san_hosts = Substitute("DNS.0:$0", config_.hostname);
+
+ extensions_ = sk_X509_EXTENSION_new_null();
+
+ // Permitted usages for the generated keys is set via X509 V3
+ // standard/extended key usage attributes.
+ // See https://www.openssl.org/docs/man1.0.1/apps/x509v3_config.html
+ // for details.
+
+ // The generated certificates are for using as TLS certificates for
+ // both client and server.
+ string usage = "critical,digitalSignature,keyEncipherment";
+ if (for_self_signing_) {
+ // If we are generating a CSR for self-signing, then we need to
+ // add this keyUsage attribute. See https://s.apache.org/BFHk
+ usage += ",keyCertSign";
+ }
+
+ RETURN_NOT_OK(PushExtension(extensions_, NID_key_usage, usage));
+ // The generated certificates should be good for authentication
+ // of a server to a client and vice versa: the intended users of the
+ // certificates are tablet servers which are going to talk to master
+ // and other tablet servers via TLS channels.
+ RETURN_NOT_OK(PushExtension(extensions_, NID_ext_key_usage,
+ "critical,serverAuth,clientAuth"));
+
+ // The generated certificates are not intended to be used as CA certificates
+ // (i.e. they cannot be used to sign/issue certificates).
+ RETURN_NOT_OK(PushExtension(extensions_, NID_basic_constraints,
+ "critical,CA:FALSE"));
+
+ if (config_.kerberos_principal) {
+ int nid = GetKuduKerberosPrincipalOidNid();
+ RETURN_NOT_OK(PushExtension(extensions_, nid,
+ Substitute("ASN1:UTF8:$0", *config_.kerberos_principal)));
+ }
+ RETURN_NOT_OK(PushExtension(extensions_, NID_subject_alt_name, san_hosts));
+
+ is_initialized_ = true;
+
+ return Status::OK();
+}
+
+bool CertRequestGenerator::Initialized() const {
+ return is_initialized_;
+}
+
+Status CertRequestGenerator::SetSubject(X509_REQ* req) const {
+ if (config_.user_id) {
+ RETURN_NOT_OK(SetSubjectNameField(X509_REQ_get_subject_name(req),
+ "UID", *config_.user_id));
+ }
+ return Status::OK();
+}
+
+Status CertRequestGenerator::SetExtensions(X509_REQ* req) const {
+ OPENSSL_RET_NOT_OK(X509_REQ_add_extensions(req, extensions_),
+ "error setting X509 request extensions");
+ return Status::OK();
+}
+
+CaCertRequestGenerator::CaCertRequestGenerator(Config config)
+ : config_(std::move(config)),
+ extensions_(nullptr),
+ is_initialized_(false) {
+}
+
+CaCertRequestGenerator::~CaCertRequestGenerator() {
+ sk_X509_EXTENSION_pop_free(extensions_, X509_EXTENSION_free);
+}
+
+Status CaCertRequestGenerator::Init() {
+ InitializeOpenSSL();
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+
+ lock_guard<simple_spinlock> guard(lock_);
+ if (is_initialized_) {
+ return Status::OK();
+ }
+ if (config_.cn.empty()) {
+ return Status::InvalidArgument("missing CA service UUID/name");
+ }
+
+ extensions_ = sk_X509_EXTENSION_new_null();
+
+ // Permitted usages for the generated keys is set via X509 V3
+ // standard/extended key usage attributes.
+ // See https://www.openssl.org/docs/man1.0.1/apps/x509v3_config.html
+ // for details.
+
+ // The target ceritifcate is a CA certificate: it's for signing X509 certs.
+ RETURN_NOT_OK(PushExtension(extensions_, NID_key_usage,
+ "critical,keyCertSign"));
+ // The generated certificates are for the private CA service.
+ RETURN_NOT_OK(PushExtension(extensions_, NID_basic_constraints,
+ "critical,CA:TRUE"));
+ is_initialized_ = true;
+
+ return Status::OK();
+}
+
+bool CaCertRequestGenerator::Initialized() const {
+ lock_guard<simple_spinlock> guard(lock_);
+ return is_initialized_;
+}
+
+Status CaCertRequestGenerator::SetSubject(X509_REQ* req) const {
+ return SetSubjectNameField(X509_REQ_get_subject_name(req), "CN", config_.cn);
+}
+
+Status CaCertRequestGenerator::SetExtensions(X509_REQ* req) const {
+ OPENSSL_RET_NOT_OK(X509_REQ_add_extensions(req, extensions_),
+ "error setting X509 request extensions");
+ return Status::OK();
+}
+
+Status CertSigner::SelfSignCA(const PrivateKey& key,
+ CaCertRequestGenerator::Config config,
+ int64_t cert_expiration_seconds,
+ Cert* cert) {
+ // Generate a CSR for the CA.
+ CertSignRequest ca_csr;
+ {
+ CaCertRequestGenerator gen(std::move(config));
+ RETURN_NOT_OK(gen.Init());
+ RETURN_NOT_OK(gen.GenerateRequest(key, &ca_csr));
+ }
+
+ // Self-sign the CA's CSR.
+ return CertSigner(nullptr, &key)
+ .set_expiration_interval(MonoDelta::FromSeconds(cert_expiration_seconds))
+ .Sign(ca_csr, cert);
+}
+
+Status CertSigner::SelfSignCert(const PrivateKey& key,
+ CertRequestGenerator::Config config,
+ Cert* cert) {
+ // Generate a CSR.
+ CertSignRequest csr;
+ {
+ CertRequestGenerator gen(std::move(config));
+ gen.enable_self_signing();
+ RETURN_NOT_OK(gen.Init());
+ RETURN_NOT_OK(gen.GenerateRequest(key, &csr));
+ }
+
+ // Self-sign the CSR with the key.
+ return CertSigner(nullptr, &key).Sign(csr, cert);
+}
+
+
+CertSigner::CertSigner(const Cert* ca_cert,
+ const PrivateKey* ca_private_key)
+ : ca_cert_(ca_cert),
+ ca_private_key_(ca_private_key) {
+ // Private key is required.
+ CHECK(ca_private_key_ && ca_private_key_->GetRawData());
+ // The cert is optional, but if we have it, it should be initialized.
+ CHECK(!ca_cert_ || ca_cert_->GetRawData());
+}
+
+Status CertSigner::Sign(const CertSignRequest& req, Cert* ret) const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ InitializeOpenSSL();
+ CHECK(ret);
+
+ // If we are not self-signing, then make sure that the provided CA
+ // cert and key match each other. Technically this would be programmer
+ // error since we're always using internally-generated CA certs, but
+ // this isn't a hot path so we'll keep the extra safety.
+ if (ca_cert_) {
+ RETURN_NOT_OK(ca_cert_->CheckKeyMatch(*ca_private_key_));
+ }
+ auto x509 = ssl_make_unique(X509_new());
+ RETURN_NOT_OK(FillCertTemplateFromRequest(req.GetRawData(), x509.get()));
+ RETURN_NOT_OK(DoSign(EVP_sha256(), exp_interval_sec_, x509.get()));
+ ret->AdoptX509(x509.release());
+
+ return Status::OK();
+}
+
+// This is modeled after code in copy_extensions() function from
+// $OPENSSL_ROOT/apps/apps.c with OpenSSL 1.0.2.
+Status CertSigner::CopyExtensions(X509_REQ* req, X509* x) {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ CHECK(req);
+ CHECK(x);
+ STACK_OF(X509_EXTENSION)* exts = X509_REQ_get_extensions(req);
+ SCOPED_CLEANUP({
+ sk_X509_EXTENSION_pop_free(exts, X509_EXTENSION_free);
+ });
+ for (size_t i = 0; i < sk_X509_EXTENSION_num(exts); ++i) {
+ X509_EXTENSION* ext = sk_X509_EXTENSION_value(exts, i);
+ ASN1_OBJECT* obj = X509_EXTENSION_get_object(ext);
+ int32_t idx = X509_get_ext_by_OBJ(x, obj, -1);
+ if (idx != -1) {
+ // If extension exits, delete all extensions of same type.
+ do {
+ auto tmpext = ssl_make_unique(X509_get_ext(x, idx));
+ X509_delete_ext(x, idx);
+ idx = X509_get_ext_by_OBJ(x, obj, -1);
+ } while (idx != -1);
+ }
+ OPENSSL_RET_NOT_OK(X509_add_ext(x, ext, -1), "error adding extension");
+ }
+
+ return Status::OK();
+}
+
+Status CertSigner::FillCertTemplateFromRequest(X509_REQ* req, X509* tmpl) {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ CHECK(req);
+
+ // As of OpenSSL 1.1, req's internals are hidden.
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+ if (!req->req_info ||
+ !req->req_info->pubkey ||
+ !req->req_info->pubkey->public_key ||
+ !req->req_info->pubkey->public_key->data) {
+ return Status::RuntimeError("corrupted CSR: no public key");
+ }
+#endif
+ auto pub_key = ssl_make_unique(X509_REQ_get_pubkey(req));
+ OPENSSL_RET_IF_NULL(pub_key, "error unpacking public key from CSR");
+ const int rc = X509_REQ_verify(req, pub_key.get());
+ if (rc < 0) {
+ return Status::RuntimeError("CSR signature verification error",
+ GetOpenSSLErrors());
+ }
+ if (rc == 0) {
+ return Status::RuntimeError("CSR signature mismatch",
+ GetOpenSSLErrors());
+ }
+ OPENSSL_RET_NOT_OK(X509_set_subject_name(tmpl, X509_REQ_get_subject_name(req)),
+ "error setting cert subject name");
+ RETURN_NOT_OK(CopyExtensions(req, tmpl));
+ OPENSSL_RET_NOT_OK(X509_set_pubkey(tmpl, pub_key.get()),
+ "error setting cert public key");
+ return Status::OK();
+}
+
+Status CertSigner::DigestSign(const EVP_MD* md, EVP_PKEY* pkey, X509* x) {
+ OPENSSL_RET_NOT_OK(X509_sign(x, pkey, md), "error signing certificate");
+ return Status::OK();
+}
+
+Status CertSigner::GenerateSerial(c_unique_ptr<ASN1_INTEGER>* ret) {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ auto btmp = ssl_make_unique(BN_new());
+ OPENSSL_RET_NOT_OK(BN_pseudo_rand(btmp.get(), 64, 0, 0),
+ "error generating random number");
+ auto serial = ssl_make_unique(ASN1_INTEGER_new());
+ OPENSSL_RET_IF_NULL(BN_to_ASN1_INTEGER(btmp.get(), serial.get()),
+ "error converting number into ASN1 representation");
+ if (ret) {
+ ret->swap(serial);
+ }
+ return Status::OK();
+}
+
+Status CertSigner::DoSign(const EVP_MD* digest, int32_t exp_seconds,
+ X509* ret) const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ CHECK(ret);
+
+ // Version 3 (v3) of X509 certificates. The integer value is one less
+ // than the version it represents. This is not a typo. :)
+ static const int kX509V3 = 2;
+
+ // If we have a CA cert, then the CA is the issuer.
+ // Otherwise, we are self-signing so the target cert is also the issuer.
+ X509* issuer_cert = ca_cert_ ? ca_cert_->GetTopOfChainX509() : ret;
+ X509_NAME* issuer_name = X509_get_subject_name(issuer_cert);
+ OPENSSL_RET_NOT_OK(X509_set_issuer_name(ret, issuer_name),
+ "error setting issuer name");
+ c_unique_ptr<ASN1_INTEGER> serial;
+ RETURN_NOT_OK(GenerateSerial(&serial));
+ // set version to v3
+ OPENSSL_RET_NOT_OK(X509_set_version(ret, kX509V3),
+ "error setting cert version");
+ OPENSSL_RET_NOT_OK(X509_set_serialNumber(ret, serial.get()),
+ "error setting cert serial");
+ OPENSSL_RET_IF_NULL(X509_gmtime_adj(X509_get_notBefore(ret), 0L),
+ "error setting cert validity time");
+ OPENSSL_RET_IF_NULL(X509_gmtime_adj(X509_get_notAfter(ret), exp_seconds),
+ "error setting cert expiration time");
+ RETURN_NOT_OK(DigestSign(digest, ca_private_key_->GetRawData(), ret));
+
+ return Status::OK();
+}
+
+} // namespace ca
+} // namespace security
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/ca/cert_management.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/ca/cert_management.h b/be/src/kudu/security/ca/cert_management.h
new file mode 100644
index 0000000..fb2bd0e
--- /dev/null
+++ b/be/src/kudu/security/ca/cert_management.h
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+
+// Forward declarations for the relevant OpenSSL typedefs
+// in addition to openssl_util.h.
+typedef struct asn1_string_st ASN1_INTEGER;
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+typedef struct env_md_st EVP_MD;
+#else
+typedef struct evp_md_st EVP_MD;
+#endif
+typedef struct rsa_st RSA;
+typedef struct x509_st X509;
+typedef struct X509_req_st X509_REQ;
+
+// STACK_OF(X509_EXTENSION)
+struct stack_st_X509_EXTENSION; // IWYU pragma: keep
+
+namespace kudu {
+namespace security {
+
+class Cert;
+class CertSignRequest;
+class PrivateKey;
+
+namespace ca {
+
+// Base utility class for issuing X509 CSRs.
+class CertRequestGeneratorBase {
+ public:
+ CertRequestGeneratorBase() = default;
+ virtual ~CertRequestGeneratorBase() = default;
+
+ virtual Status Init() = 0;
+ virtual bool Initialized() const = 0;
+
+ // Generate X509 CSR using the specified key. To obtain the key,
+ // call the GeneratePrivateKey() function.
+ Status GenerateRequest(const PrivateKey& key, CertSignRequest* ret) const WARN_UNUSED_RESULT;
+
+ protected:
+ // Push the specified extension into the stack provided.
+ static Status PushExtension(stack_st_X509_EXTENSION* st,
+ int32_t nid,
+ StringPiece value) WARN_UNUSED_RESULT;
+
+ // Set the certificate-specific subject fields into the specified request.
+ virtual Status SetSubject(X509_REQ* req) const = 0;
+
+ // Set the certificate-specific extensions into the specified request.
+ virtual Status SetExtensions(X509_REQ* req) const = 0;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(CertRequestGeneratorBase);
+};
+
+// An utility class that facilitates issuing certificate signing requests
+// (a.k.a. X509 CSRs).
+class CertRequestGenerator : public CertRequestGeneratorBase {
+ public:
+ // Properties for the generated X509 CSR. The 'hostname' is for the name of
+ // the machine the requestor is to use the certificate at. Valid configuration
+ // should contain non-empty 'hostname' field.
+ struct Config {
+ // FQDN name to put into the 'DNS' fields of the subjectAltName extension.
+ std::string hostname;
+ // userId (UID)
+ boost::optional<std::string> user_id;
+ // Our custom extension which stores the full Kerberos principal for IPKI certs.
+ boost::optional<std::string> kerberos_principal;
+ };
+
+ // 'config' contains the properties to fill into the X509 attributes of the
+ // CSR.
+ explicit CertRequestGenerator(Config config);
+ ~CertRequestGenerator();
+
+ Status Init() override WARN_UNUSED_RESULT;
+ bool Initialized() const override;
+
+ CertRequestGenerator& enable_self_signing() {
+ CHECK(!is_initialized_);
+ for_self_signing_ = true;
+ return *this;
+ }
+
+ protected:
+ Status SetSubject(X509_REQ* req) const override WARN_UNUSED_RESULT;
+ Status SetExtensions(X509_REQ* req) const override WARN_UNUSED_RESULT;
+
+ private:
+ const Config config_;
+ stack_st_X509_EXTENSION* extensions_ = nullptr;
+ bool is_initialized_ = false;
+ bool for_self_signing_ = false;
+};
+
+// An utility class that facilitates issuing of root CA self-signed certificate
+// signing requests.
+class CaCertRequestGenerator : public CertRequestGeneratorBase {
+ public:
+ // Properties for the generated X509 CA CSR.
+ struct Config {
+ // Common name (CN); e.g. 'master 239D6D2F-BDD2-4463-8933-78D9559C2124'.
+ // Don't put hostname/FQDN in here: for CA cert it does not make sense and
+ // it might be longer than 64 characters which is the limit specified
+ // by RFC5280. The limit is enforced by the OpenSSL library.
+ std::string cn;
+ };
+
+ explicit CaCertRequestGenerator(Config config);
+ ~CaCertRequestGenerator();
+
+ Status Init() override WARN_UNUSED_RESULT;
+ bool Initialized() const override;
+
+ protected:
+ Status SetSubject(X509_REQ* req) const override WARN_UNUSED_RESULT;
+ Status SetExtensions(X509_REQ* req) const override WARN_UNUSED_RESULT;
+
+ private:
+ const Config config_;
+ stack_st_X509_EXTENSION* extensions_;
+ mutable simple_spinlock lock_;
+ bool is_initialized_; // protected by lock_
+};
+
+// An utility class for issuing and signing certificates.
+//
+// This is used in "fluent" style. For example:
+//
+// CHECK_OK(CertSigner(&my_ca_cert, &my_ca_key)
+// .set_expiration_interval(MonoDelta::FromSeconds(3600))
+// .Sign(csr, &cert));
+//
+// As such, this class is not guaranteed thread-safe.
+class CertSigner {
+ public:
+ // Generate a self-signed certificate authority using the given key
+ // and CSR configuration.
+ static Status SelfSignCA(const PrivateKey& key,
+ CaCertRequestGenerator::Config config,
+ int64_t cert_expiration_seconds,
+ Cert* cert) WARN_UNUSED_RESULT;
+
+ // Generate a self-signed certificate using the given key and CSR
+ // configuration.
+ static Status SelfSignCert(const PrivateKey& key,
+ CertRequestGenerator::Config config,
+ Cert* cert) WARN_UNUSED_RESULT;
+
+ // Create a CertSigner.
+ //
+ // The given cert and key must stay valid for the lifetime of the
+ // cert signer. See class documentation above for recommended usage.
+ //
+ // 'ca_cert' may be nullptr in order to perform self-signing (though
+ // the SelfSignCA() static method above is recommended).
+ CertSigner(const Cert* ca_cert, const PrivateKey* ca_private_key);
+ ~CertSigner() = default;
+
+ // Set the expiration interval for certs signed by this signer.
+ // This may be changed at any point.
+ CertSigner& set_expiration_interval(MonoDelta expiration) {
+ exp_interval_sec_ = expiration.ToSeconds();
+ return *this;
+ }
+
+ Status Sign(const CertSignRequest& req, Cert* ret) const WARN_UNUSED_RESULT;
+
+ private:
+
+ static Status CopyExtensions(X509_REQ* req, X509* x) WARN_UNUSED_RESULT;
+ static Status FillCertTemplateFromRequest(X509_REQ* req, X509* tmpl) WARN_UNUSED_RESULT;
+ static Status DigestSign(const EVP_MD* md, EVP_PKEY* pkey, X509* x) WARN_UNUSED_RESULT;
+ static Status GenerateSerial(c_unique_ptr<ASN1_INTEGER>* ret) WARN_UNUSED_RESULT;
+
+ Status DoSign(const EVP_MD* digest, int32_t exp_seconds, X509 *ret) const WARN_UNUSED_RESULT;
+
+ // The expiration interval of certs signed by this signer.
+ int32_t exp_interval_sec_ = 24 * 60 * 60;
+
+ // The CA cert. null if this CertSigner is configured for self-signing.
+ const Cert* const ca_cert_;
+
+ // The CA private key. If configured for self-signing, this is the
+ // private key associated with the target cert.
+ const PrivateKey* const ca_private_key_;
+
+ DISALLOW_COPY_AND_ASSIGN(CertSigner);
+};
+
+} // namespace ca
+} // namespace security
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/cert-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/cert-test.cc b/be/src/kudu/security/cert-test.cc
new file mode 100644
index 0000000..12205e1
--- /dev/null
+++ b/be/src/kudu/security/cert-test.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/test/test_certs.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::pair;
+using std::string;
+using std::thread;
+using std::vector;
+
+namespace kudu {
+namespace security {
+
+// Test for various certificate-related functionality in the security library.
+// These do not cover CA certificate mananagement part; check
+// cert_management-test.cc for those.
+class CertTest : public KuduTest {
+ public:
+ void SetUp() override {
+ ASSERT_OK(ca_cert_.FromString(kCaCert, DataFormat::PEM));
+ ASSERT_OK(ca_private_key_.FromString(kCaPrivateKey, DataFormat::PEM));
+ ASSERT_OK(ca_public_key_.FromString(kCaPublicKey, DataFormat::PEM));
+ ASSERT_OK(ca_exp_cert_.FromString(kCaExpiredCert, DataFormat::PEM));
+ ASSERT_OK(ca_exp_private_key_.FromString(kCaExpiredPrivateKey,
+ DataFormat::PEM));
+ // Sanity checks.
+ ASSERT_OK(ca_cert_.CheckKeyMatch(ca_private_key_));
+ ASSERT_OK(ca_exp_cert_.CheckKeyMatch(ca_exp_private_key_));
+ }
+
+ protected:
+ Cert ca_cert_;
+ PrivateKey ca_private_key_;
+ PublicKey ca_public_key_;
+
+ Cert ca_exp_cert_;
+ PrivateKey ca_exp_private_key_;
+};
+
+// Regression test to make sure that GetKuduKerberosPrincipalOidNid is thread
+// safe. OpenSSL 1.0.0's OBJ_create method is not thread safe.
+TEST_F(CertTest, GetKuduKerberosPrincipalOidNidConcurrent) {
+ int kConcurrency = 16;
+ Barrier barrier(kConcurrency);
+
+ vector<thread> threads;
+ for (int i = 0; i < kConcurrency; i++) {
+ threads.emplace_back([&] () {
+ barrier.Wait();
+ CHECK_NE(NID_undef, GetKuduKerberosPrincipalOidNid());
+ });
+ }
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+}
+
+// Check input/output of the X509 certificates in PEM format.
+TEST_F(CertTest, CertInputOutputPEM) {
+ const Cert& cert = ca_cert_;
+ string cert_str;
+ ASSERT_OK(cert.ToString(&cert_str, DataFormat::PEM));
+ RemoveExtraWhitespace(&cert_str);
+
+ string ca_input_cert(kCaCert);
+ RemoveExtraWhitespace(&ca_input_cert);
+ EXPECT_EQ(ca_input_cert, cert_str);
+}
+
+// Check that Cert behaves in a predictable way if given invalid PEM data.
+TEST_F(CertTest, CertInvalidInput) {
+ // Providing files which guaranteed to exists, but do not contain valid data.
+ // This is to make sure the init handles that situation correctly and
+ // does not choke on the wrong input data.
+ Cert c;
+ ASSERT_FALSE(c.FromFile("/bin/sh", DataFormat::PEM).ok());
+}
+
+// Check X509 certificate/private key matching: match cases.
+TEST_F(CertTest, CertMatchesRsaPrivateKey) {
+ const pair<const Cert*, const PrivateKey*> cases[] = {
+ { &ca_cert_, &ca_private_key_ },
+ { &ca_exp_cert_, &ca_exp_private_key_ },
+ };
+ for (const auto& e : cases) {
+ EXPECT_OK(e.first->CheckKeyMatch(*e.second));
+ }
+}
+
+// Check X509 certificate/private key matching: mismatch cases.
+TEST_F(CertTest, CertMismatchesRsaPrivateKey) {
+ const pair<const Cert*, const PrivateKey*> cases[] = {
+ { &ca_cert_, &ca_exp_private_key_ },
+ { &ca_exp_cert_, &ca_private_key_ },
+ };
+ for (const auto& e : cases) {
+ const Status s = e.first->CheckKeyMatch(*e.second);
+ EXPECT_TRUE(s.IsRuntimeError()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "certificate does not match private key");
+ }
+}
+
+TEST_F(CertTest, TestGetKuduSpecificFieldsWhenMissing) {
+ EXPECT_EQ(boost::none, ca_cert_.UserId());
+ EXPECT_EQ(boost::none, ca_cert_.KuduKerberosPrincipal());
+}
+
+TEST_F(CertTest, DnsHostnameInSanField) {
+ const string hostname_foo_bar = "foo.bar.com";
+ const string hostname_mega_giga = "mega.giga.io";
+ const string hostname_too_long =
+ "toooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo."
+ "looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+ "ng.hostname.io";
+
+ Cert cert;
+ ASSERT_OK(cert.FromString(kCertDnsHostnamesInSan, DataFormat::PEM));
+
+ EXPECT_EQ("C = US, ST = CA, O = MyCompany, CN = MyName, emailAddress = my@email.com",
+ cert.IssuerName());
+ vector<string> hostnames = cert.Hostnames();
+ ASSERT_EQ(3, hostnames.size());
+ EXPECT_EQ(hostname_mega_giga, hostnames[0]);
+ EXPECT_EQ(hostname_foo_bar, hostnames[1]);
+ EXPECT_EQ(hostname_too_long, hostnames[2]);
+}
+
+} // namespace security
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/cert.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/cert.cc b/be/src/kudu/security/cert.cc
new file mode 100644
index 0000000..b81d263
--- /dev/null
+++ b/be/src/kudu/security/cert.cc
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/security/cert.h"
+
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <openssl/evp.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/openssl_util_bio.h"
+#include "kudu/util/status.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace security {
+
+template<> struct SslTypeTraits<GENERAL_NAMES> {
+ static constexpr auto kFreeFunc = &GENERAL_NAMES_free;
+};
+
+// This OID is generated via the UUID method.
+static const char* kKuduKerberosPrincipalOidStr = "2.25.243346677289068076843480765133256509912";
+
+string X509NameToString(X509_NAME* name) {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ CHECK(name);
+ auto bio = ssl_make_unique(BIO_new(BIO_s_mem()));
+ OPENSSL_CHECK_OK(X509_NAME_print_ex(bio.get(), name, 0, XN_FLAG_ONELINE));
+
+ BUF_MEM* membuf;
+ OPENSSL_CHECK_OK(BIO_get_mem_ptr(bio.get(), &membuf));
+ return string(membuf->data, membuf->length);
+}
+
+int GetKuduKerberosPrincipalOidNid() {
+ InitializeOpenSSL();
+ static std::once_flag flag;
+ static int nid;
+ std::call_once(flag, [&] () {
+ nid = OBJ_create(kKuduKerberosPrincipalOidStr, "kuduPrinc", "kuduKerberosPrincipal");
+ CHECK_NE(nid, NID_undef) << "failed to create kuduPrinc oid: " << GetOpenSSLErrors();
+ });
+ return nid;
+}
+
+X509* Cert::GetTopOfChainX509() const {
+ CHECK_GT(chain_len(), 0);
+ return sk_X509_value(data_.get(), 0);
+}
+
+Status Cert::FromString(const std::string& data, DataFormat format) {
+ RETURN_NOT_OK(::kudu::security::FromString(data, format, &data_));
+ if (sk_X509_num(data_.get()) < 1) {
+ return Status::RuntimeError("Certificate chain is empty. Expected at least one certificate.");
+ }
+ return Status::OK();
+}
+
+Status Cert::ToString(std::string* data, DataFormat format) const {
+ return ::kudu::security::ToString(data, format, data_.get());
+}
+
+Status Cert::FromFile(const std::string& fpath, DataFormat format) {
+ RETURN_NOT_OK(::kudu::security::FromFile(fpath, format, &data_));
+ if (sk_X509_num(data_.get()) < 1) {
+ return Status::RuntimeError("Certificate chain is empty. Expected at least one certificate.");
+ }
+ return Status::OK();
+}
+
+string Cert::SubjectName() const {
+ return X509NameToString(X509_get_subject_name(GetTopOfChainX509()));
+}
+
+string Cert::IssuerName() const {
+ return X509NameToString(X509_get_issuer_name(GetTopOfChainX509()));
+}
+
+boost::optional<string> Cert::UserId() const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ X509_NAME* name = X509_get_subject_name(GetTopOfChainX509());
+ char buf[1024];
+ int len = X509_NAME_get_text_by_NID(name, NID_userId, buf, arraysize(buf));
+ if (len < 0) return boost::none;
+ return string(buf, len);
+}
+
+vector<string> Cert::Hostnames() const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ vector<string> result;
+ auto gens = ssl_make_unique(reinterpret_cast<GENERAL_NAMES*>(X509_get_ext_d2i(
+ GetTopOfChainX509(), NID_subject_alt_name, nullptr, nullptr)));
+ if (gens) {
+ for (int i = 0; i < sk_GENERAL_NAME_num(gens.get()); ++i) {
+ GENERAL_NAME* gen = sk_GENERAL_NAME_value(gens.get(), i);
+ if (gen->type != GEN_DNS) {
+ continue;
+ }
+ const ASN1_STRING* cstr = gen->d.dNSName;
+ if (cstr->type != V_ASN1_IA5STRING || cstr->data == nullptr) {
+ LOG(DFATAL) << "invalid DNS name in the SAN field";
+ return {};
+ }
+ result.emplace_back(reinterpret_cast<char*>(cstr->data), cstr->length);
+ }
+ }
+ return result;
+}
+
+boost::optional<string> Cert::KuduKerberosPrincipal() const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ int idx = X509_get_ext_by_NID(GetTopOfChainX509(), GetKuduKerberosPrincipalOidNid(), -1);
+ if (idx < 0) return boost::none;
+ X509_EXTENSION* ext = X509_get_ext(GetTopOfChainX509(), idx);
+ ASN1_OCTET_STRING* octet_str = X509_EXTENSION_get_data(ext);
+ const unsigned char* octet_str_data = octet_str->data;
+ long len; // NOLINT
+ int tag, xclass;
+ if (ASN1_get_object(&octet_str_data, &len, &tag, &xclass, octet_str->length) != 0 ||
+ tag != V_ASN1_UTF8STRING) {
+ LOG(DFATAL) << "invalid extension value in cert " << SubjectName();
+ return boost::none;
+ }
+
+ return string(reinterpret_cast<const char*>(octet_str_data), len);
+}
+
+Status Cert::CheckKeyMatch(const PrivateKey& key) const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ OPENSSL_RET_NOT_OK(X509_check_private_key(GetTopOfChainX509(), key.GetRawData()),
+ "certificate does not match private key");
+ return Status::OK();
+}
+
+Status Cert::GetServerEndPointChannelBindings(string* channel_bindings) const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ // Find the signature type of the certificate. This corresponds to the digest
+ // (hash) algorithm, and the public key type which signed the cert.
+
+#if OPENSSL_VERSION_NUMBER >= 0x10002000L
+ int signature_nid = X509_get_signature_nid(GetTopOfChainX509());
+#else
+ // Older version of OpenSSL appear not to have a public way to get the
+ // signature digest method from a certificate. Instead, we reach into the
+ // 'private' internals.
+ int signature_nid = OBJ_obj2nid(GetTopOfChainX509()->sig_alg->algorithm);
+#endif
+
+ // Retrieve the digest algorithm type.
+ int digest_nid;
+ int public_key_nid;
+ OBJ_find_sigid_algs(signature_nid, &digest_nid, &public_key_nid);
+
+ // RFC 5929: if the certificate's signatureAlgorithm uses no hash functions or
+ // uses multiple hash functions, then this channel binding type's channel
+ // bindings are undefined at this time (updates to is channel binding type may
+ // occur to address this issue if it ever arises).
+ //
+ // TODO(dan): can the multiple hash function scenario actually happen? What
+ // does OBJ_find_sigid_algs do in that scenario?
+ if (digest_nid == NID_undef) {
+ return Status::NotSupported("server certificate has no signature digest (hash) algorithm");
+ }
+
+ // RFC 5929: if the certificate's signatureAlgorithm uses a single hash
+ // function, and that hash function is either MD5 [RFC1321] or SHA-1
+ // [RFC3174], then use SHA-256 [FIPS-180-3];
+ if (digest_nid == NID_md5 || digest_nid == NID_sha1) {
+ digest_nid = NID_sha256;
+ }
+
+ const EVP_MD* md = EVP_get_digestbynid(digest_nid);
+ OPENSSL_RET_IF_NULL(md, "digest for nid not found");
+
+ // Create a digest BIO. All data written to the BIO will be sent through the
+ // digest (hash) function. The digest BIO requires a null BIO to writethrough to.
+ auto null_bio = ssl_make_unique(BIO_new(BIO_s_null()));
+ auto md_bio = ssl_make_unique(BIO_new(BIO_f_md()));
+ OPENSSL_RET_NOT_OK(BIO_set_md(md_bio.get(), md), "failed to set digest for BIO");
+ BIO_push(md_bio.get(), null_bio.get());
+
+ // Write the cert to the digest BIO.
+ RETURN_NOT_OK(ToBIO(md_bio.get(), DataFormat::DER, data_.get()));
+
+ // Read the digest from the BIO and append it to 'channel_bindings'.
+ char buf[EVP_MAX_MD_SIZE];
+ int digest_len = BIO_gets(md_bio.get(), buf, sizeof(buf));
+ OPENSSL_RET_NOT_OK(digest_len, "failed to get cert digest from BIO");
+ channel_bindings->assign(buf, digest_len);
+ return Status::OK();
+}
+
+void Cert::AdoptAndAddRefRawData(RawDataType* data) {
+ DCHECK_EQ(sk_X509_num(data), 1);
+ X509* cert = sk_X509_value(data, sk_X509_num(data) - 1);
+
+ DCHECK(cert);
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+ CHECK_GT(CRYPTO_add(&cert->references, 1, CRYPTO_LOCK_X509), 1) << "X509 use-after-free detected";
+#else
+ OPENSSL_CHECK_OK(X509_up_ref(cert)) << "X509 use-after-free detected: " << GetOpenSSLErrors();
+#endif
+ // We copy the STACK_OF() object, but the copy and the original both internally point to the
+ // same elements.
+ AdoptRawData(sk_X509_dup(data));
+}
+
+void Cert::AdoptX509(X509* cert) {
+ // Free current STACK_OF(X509).
+ sk_X509_pop_free(data_.get(), X509_free);
+ // Allocate new STACK_OF(X509) and populate with 'cert'.
+ STACK_OF(X509)* sk = sk_X509_new_null();
+ DCHECK(sk);
+ sk_X509_push(sk, cert);
+ AdoptRawData(sk);
+}
+
+void Cert::AdoptAndAddRefX509(X509* cert) {
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+ CHECK_GT(CRYPTO_add(&cert->references, 1, CRYPTO_LOCK_X509), 1) << "X509 use-after-free detected";
+#else
+ OPENSSL_CHECK_OK(X509_up_ref(cert)) << "X509 use-after-free detected: " << GetOpenSSLErrors();
+#endif
+ AdoptX509(cert);
+}
+
+Status Cert::GetPublicKey(PublicKey* key) const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ EVP_PKEY* raw_key = X509_get_pubkey(GetTopOfChainX509());
+ OPENSSL_RET_IF_NULL(raw_key, "unable to get certificate public key");
+ key->AdoptRawData(raw_key);
+ return Status::OK();
+}
+
+Status CertSignRequest::FromString(const std::string& data, DataFormat format) {
+ return ::kudu::security::FromString(data, format, &data_);
+}
+
+Status CertSignRequest::ToString(std::string* data, DataFormat format) const {
+ return ::kudu::security::ToString(data, format, data_.get());
+}
+
+Status CertSignRequest::FromFile(const std::string& fpath, DataFormat format) {
+ return ::kudu::security::FromFile(fpath, format, &data_);
+}
+
+CertSignRequest CertSignRequest::Clone() const {
+ X509_REQ* cloned_req;
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+ CHECK_GT(CRYPTO_add(&data_->references, 1, CRYPTO_LOCK_X509_REQ), 1)
+ << "X509_REQ use-after-free detected";
+ cloned_req = GetRawData();
+#else
+ // With OpenSSL 1.1, data structure internals are hidden, and there doesn't
+ // seem to be a public method that increments data_'s refcount.
+ cloned_req = X509_REQ_dup(GetRawData());
+ CHECK(cloned_req != nullptr)
+ << "X509 allocation failure detected: " << GetOpenSSLErrors();
+#endif
+
+ CertSignRequest clone;
+ clone.AdoptRawData(cloned_req);
+ return clone;
+}
+
+Status CertSignRequest::GetPublicKey(PublicKey* key) const {
+ SCOPED_OPENSSL_NO_PENDING_ERRORS;
+ EVP_PKEY* raw_key = X509_REQ_get_pubkey(data_.get());
+ OPENSSL_RET_IF_NULL(raw_key, "unable to get CSR public key");
+ key->AdoptRawData(raw_key);
+ return Status::OK();
+}
+
+} // namespace security
+} // namespace kudu