You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:21 UTC
[09/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.h b/be/src/kudu/util/pb_util.h
new file mode 100644
index 0000000..6c132a6
--- /dev/null
+++ b/be/src/kudu/util/pb_util.h
@@ -0,0 +1,513 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utilities for dealing with protocol buffers.
+// These are mostly just functions similar to what are found in the protobuf
+// library itself, but using kudu::faststring instances instead of STL strings.
+#ifndef KUDU_UTIL_PB_UTIL_H
+#define KUDU_UTIL_PB_UTIL_H
+
+#include <cstdint>
+#include <iosfwd>
+#include <memory>
+#include <string>
+
+#include <boost/optional/optional.hpp>
+#include <google/protobuf/message.h>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/debug/trace_event_impl.h"
+
+namespace google {
+namespace protobuf {
+class DescriptorPool;
+class FileDescriptor;
+class FileDescriptorSet;
+class MessageLite;
+class SimpleDescriptorDatabase;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Env;
+class RandomAccessFile;
+class SequentialFile;
+class Slice;
+class Status;
+class RWFile;
+class faststring;
+
+namespace pb_util {
+
+enum SyncMode {
+ SYNC,
+ NO_SYNC
+};
+
+enum CreateMode {
+ OVERWRITE,
+ NO_OVERWRITE
+};
+
+enum class FileState {
+ NOT_INITIALIZED,
+ OPEN,
+ CLOSED
+};
+
+// The minimum valid length of a PBC file.
+extern const int kPBContainerMinimumValidLength;
+
+// See MessageLite::AppendToString
+void AppendToString(const google::protobuf::MessageLite &msg, faststring *output);
+
+// See MessageLite::AppendPartialToString
+void AppendPartialToString(const google::protobuf::MessageLite &msg, faststring *output);
+
+// See MessageLite::SerializeToString.
+void SerializeToString(const google::protobuf::MessageLite &msg, faststring *output);
+
+// See MessageLite::ParseFromZeroCopyStream
+Status ParseFromSequentialFile(google::protobuf::MessageLite *msg, SequentialFile *rfile);
+
+// Similar to MessageLite::ParseFromArray, with the difference that it returns
+// Status::Corruption() if the message could not be parsed.
+Status ParseFromArray(google::protobuf::MessageLite* msg, const uint8_t* data, uint32_t length);
+
+// Load a protobuf from the given path.
+Status ReadPBFromPath(Env* env, const std::string& path, google::protobuf::MessageLite* msg);
+
+// Serialize a protobuf to the given path.
+//
+// If SyncMode SYNC is provided, ensures the changes are made durable.
+Status WritePBToPath(Env* env, const std::string& path,
+ const google::protobuf::MessageLite& msg, SyncMode sync);
+
+// Truncate any 'bytes' or 'string' fields of this message to max_len.
+// The text "<truncated>" is appended to any such truncated fields.
+void TruncateFields(google::protobuf::Message* message, int max_len);
+
+// Redaction-sensitive variant of Message::DebugString.
+//
+// For most protobufs, this has identical output to Message::DebugString. However,
+// a field with string or binary type may be tagged with the 'kudu.REDACT' option,
+// available by importing 'pb_util.proto'. When such a field is encountered by this
+// method, its contents will be redacted using the 'KUDU_REDACT' macro as documented
+// in kudu/util/logging.h.
+std::string SecureDebugString(const google::protobuf::Message& msg);
+
+// Same as SecureDebugString() above, but equivalent to Message::ShortDebugString.
+std::string SecureShortDebugString(const google::protobuf::Message& msg);
+
+// A protobuf "container" has the following format (all integers in
+// little-endian byte order).
+//
+// <file header>
+// <1 or more records>
+//
+// Note: There are two versions (version 1 and version 2) of the record format.
+// Version 2 of the file format differs from version 1 in the following ways:
+//
+// * Version 2 has a file header checksum.
+// * Version 2 has separate checksums for the record length and record data
+// fields.
+//
+// File header format
+// ------------------
+//
+// Each protobuf container file contains a file header identifying the file.
+// This includes:
+//
+// magic number: 8 byte string identifying the file format.
+//
+// Included so that we have a minimal guarantee that this file is of the
+// type we expect and that we are not just reading garbage.
+//
+// container_version: 4 byte unsigned integer indicating the "version" of the
+// container format. May be set to 1 or 2.
+//
+// Included so that this file format may be extended at some later date
+// while maintaining backwards compatibility.
+//
+// file_header_checksum (version 2+ only): 4 byte unsigned integer with a CRC32C
+// of the magic and version fields.
+//
+// Included so that we can validate the container version number.
+//
+// The remaining container fields are considered part of a "record". There may
+// be 1 or more records in a valid protobuf container file.
+//
+// Record format
+// -------------
+//
+// data length: 4 byte unsigned integer indicating the size of the encoded data.
+//
+// Included because PB messages aren't self-delimiting, and thus
+// writing a stream of messages to the same file requires
+// delimiting each with its size.
+//
+// See https://developers.google.com/protocol-buffers/docs/techniques?hl=zh-cn#streaming
+// for more details.
+//
+// length checksum (version 2+ only): 4-byte unsigned integer containing the
+// CRC32C checksum of "data length".
+//
+// Included so that we may discern the difference between a truncated file
+// and a corrupted length field.
+//
+// data: "size" bytes of protobuf data encoded according to the schema.
+//
+// Our payload.
+//
+// data checksum: 4 byte unsigned integer containing the CRC32C checksum of "data".
+//
+// Included to ensure validity of the data on-disk.
+// Note: In version 1 of the file format, this is a checksum of both the
+// "data length" and "data" fields. In version 2+, this is only a checksum
+// of the "data" field.
+//
+// Supplemental header
+// -------------------
+//
+// A valid container must have at least one record, the first of
+// which is known as the "supplemental header". The supplemental header
+// contains additional container-level information, including the protobuf
+// schema used for the records following it. See pb_util.proto for details. As
+// a containerized PB message, the supplemental header is protected by a CRC32C
+// checksum like any other message.
+//
+// Error detection and tolerance
+// -----------------------------
+//
+// It is worth describing the kinds of errors that can be detected by the
+// protobuf container and the kinds that cannot.
+//
+// The checksums in the container are independent, not rolling. As such,
+// they won't detect the disappearance or reordering of entire protobuf
+// messages, which can happen if a range of the file is collapsed (see
+// man fallocate(2)) or if the file is otherwise manually manipulated.
+//
+// In version 1, the checksums do not protect against corruption in the data
+// length field. However, version 2 of the format resolves that problem. The
+// benefit is that version 2 files can tell the difference between a record
+// with a corrupted length field and a record that was only partially written.
+// See ReadablePBContainerFile::ReadNextPB() for discussion on how this
+// difference is expressed via the API.
+//
+// In version 1 of the format, corruption of the version field in the file
+// header is not detectable. However, version 2 of the format addresses that
+// limitation as well.
+//
+// Corruption of the protobuf data itself is detected in all versions of the
+// file format (subject to CRC32 limitations).
+//
+// The container does not include footers or periodic checkpoints. As such, it
+// will not detect if entire records are truncated.
+//
+// The design and implementation relies on data ordering guarantees provided by
+// the file system to ensure that bytes are written to a file before the file
+// metadata (file size) is updated. A partially-written record (the result of a
+// failed append) is identified by one of the following criteria:
+// 1. Too-few bytes remain in the file to constitute a valid record. For
+// version 2, that would be fewer than 12 bytes (data len, data len
+// checksum, and data checksum), or
+// 2. Assuming a record's data length field is valid, then fewer bytes remain
+// in the file than are specified in the data length field (plus enough for
+// checksums).
+// In the above scenarios, it is assumed that the system faulted while in the
+// middle of appending a record, and it is considered safe to truncate the file
+// at the beginning of the partial record.
+//
+// If filesystem preallocation is used (at the time of this writing, the
+// implementation does not support preallocation) then even version 2 of the
+// format cannot safely support culling trailing partially-written records.
+// This is because it is not possible to reliably tell the difference between a
+// partially-written record that did not complete fsync (resulting in a bad
+// checksum) vs. a record that successfully was written to disk but then fell
+// victim to bit-level disk corruption. See also KUDU-1414.
+//
+// These tradeoffs in error detection are reasonable given the failure
+// environment that Kudu operates within. We tolerate failures such as
+// "kill -9" of the Kudu process, machine power loss, or fsync/fdatasync
+// failure, but not failures like runaway processes mangling data files
+// in arbitrary ways or attackers crafting malicious data files.
+//
+// In short, no version of the file format will detect truncation of entire
+// protobuf records. Version 2 relies on ordered data flushing semantics for
+// automatic recoverability from partial record writes. Version 1 of the file
+// format cannot support automatic recoverability from partial record writes.
+//
+// For further reading on what files might look like following a normal
+// filesystem failure or disk corruption, and the likelihood of various types
+// of disk errors, see the following papers:
+//
+// https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf
+// https://www.usenix.org/legacy/event/fast08/tech/full_papers/bairavasundaram/bairavasundaram.pdf
+
+// Protobuf container file opened for writing. Can be built around an existing
+// file or a completely new file.
+//
+// Every function is thread-safe unless indicated otherwise.
+class WritablePBContainerFile {
+ public:
+
+ // Initializes the class instance; writer must be open.
+ explicit WritablePBContainerFile(std::shared_ptr<RWFile> writer);
+
+ // Closes the container if not already closed.
+ ~WritablePBContainerFile();
+
+ // Writes the file header to disk and initializes the write offset to the
+ // byte after the file header. This method should NOT be called when opening
+ // an existing file for append; use OpenExisting() for that.
+ //
+ // 'msg' need not be populated; its type is used to "lock" the container
+ // to a particular protobuf message type in Append().
+ //
+ // Not thread-safe.
+ Status CreateNew(const google::protobuf::Message& msg);
+
+ // Opens an existing protobuf container file for append. The file must
+ // already have a valid file header. To initialize a new blank file for
+ // writing, use CreateNew() instead.
+ //
+ // The file header is read and the version specified there is used as the
+ // format version. The length of the file is also read and is used as the
+ // write offset for subsequent Append() calls. WritablePBContainerFile caches
+ // the write offset instead of constantly calling stat() on the file each
+ // time append is called.
+ //
+ // Not thread-safe.
+ Status OpenExisting();
+
+ // Writes a protobuf message to the container, beginning with its size
+ // and ending with its CRC32 checksum. One of CreateNew() or OpenExisting()
+ // must be called prior to calling Append(), i.e. the file must be open.
+ Status Append(const google::protobuf::Message& msg);
+
+ // Asynchronously flushes all dirty container data to the filesystem.
+ // The file must be open.
+ Status Flush();
+
+ // Synchronizes all dirty container data to the filesystem.
+ // The file must be open.
+ //
+ // Note: the parent directory is _not_ synchronized. Because the
+ // container file was provided during construction, we don't know whether
+ // it was created or reopened, and parent directory synchronization is
+ // only needed in the former case.
+ Status Sync();
+
+ // Closes the container.
+ //
+ // Not thread-safe.
+ Status Close();
+
+ // Returns the path to the container's underlying file handle.
+ const std::string& filename() const;
+
+ private:
+ friend class TestPBUtil;
+ FRIEND_TEST(TestPBUtil, TestPopulateDescriptorSet);
+
+ // Set the file format version. Only used for testing.
+ // Must be called before CreateNew().
+ Status SetVersionForTests(int version);
+
+ // Write the protobuf schemas belonging to 'desc' and all of its
+ // dependencies to 'output'.
+ //
+ // Schemas are written in dependency order (i.e. if A depends on B which
+ // depends on C, the order is C, B, A).
+ static void PopulateDescriptorSet(const google::protobuf::FileDescriptor* desc,
+ google::protobuf::FileDescriptorSet* output);
+
+ // Serialize the contents of 'msg' into 'buf' along with additional metadata
+ // to aid in deserialization.
+ Status AppendMsgToBuffer(const google::protobuf::Message& msg, faststring* buf);
+
+ // Append bytes to the file.
+ Status AppendBytes(const Slice& data);
+
+ // State of the file.
+ FileState state_;
+
+ // Protects offset_.
+ Mutex offset_lock_;
+
+ // Current write offset into the file.
+ uint64_t offset_;
+
+ // Protobuf container file version.
+ int version_;
+
+ // File writer.
+ std::shared_ptr<RWFile> writer_;
+};
+
+// Protobuf container file opened for reading.
+//
+// Can be built around a file with existing contents or an empty file (in
+// which case it's safe to interleave with WritablePBContainerFile).
+class ReadablePBContainerFile {
+ public:
+
+ // Initializes the class instance; reader must be open.
+ explicit ReadablePBContainerFile(std::shared_ptr<RandomAccessFile> reader);
+
+ // Closes the file if not already closed.
+ ~ReadablePBContainerFile();
+
+ // Reads the header information from the container and validates it.
+ // Must be called before any of the other methods.
+ Status Open();
+
+ // Reads a protobuf message from the container, validating its size and
+ // data using a CRC32 checksum. File must be open.
+ //
+ // Return values:
+ // * If there are no more records in the file, returns Status::EndOfFile.
+ // * If there is a partial record, but it is not long enough to be a full
+ // record or the written length of the record is less than the remaining
+ // bytes in the file, returns Status::Incomplete. If Status::Incomplete
+ // is returned, calling offset() will return the point in the file where
+ // the invalid partial record begins. In order to append additional records
+ // to the file, the file must first be truncated at that offset.
+ // Note: Version 1 of this file format will never return
+ // Status::Incomplete() from this method.
+ // * If a corrupt record is encountered, returns Status::Corruption.
+ // * On success, stores the result in '*msg' and returns OK.
+ Status ReadNextPB(google::protobuf::Message* msg);
+
+ // Dumps any unread protobuf messages in the container to 'os'. Each
+ // message's DebugString() method is invoked to produce its textual form.
+ // File must be open.
+ enum class Format {
+ // Print each message on multiple lines, with intervening headers.
+ DEFAULT,
+ // Same as DEFAULT but includes additional metadata information.
+ DEBUG,
+ // Print each message on its own line.
+ ONELINE,
+ // Dump in JSON.
+ JSON
+ };
+ Status Dump(std::ostream* os, Format format);
+
+ // Closes the container.
+ Status Close();
+
+ // Expected PB type and schema for each message to be read.
+ //
+ // Only valid after a successful call to Open().
+ const std::string& pb_type() const { return pb_type_; }
+ const google::protobuf::FileDescriptorSet* protos() const {
+ return protos_.get();
+ }
+
+ // Get the prototype instance for the type of messages stored in this
+ // file. The returned Message is owned by this ReadablePBContainerFile instance.
+ Status GetPrototype(const google::protobuf::Message** prototype);
+
+ // Return the protobuf container file format version.
+ // File must be open.
+ int version() const;
+
+ // Return current read offset.
+ // File must be open.
+ uint64_t offset() const;
+
+ private:
+ FileState state_;
+ int version_;
+ uint64_t offset_;
+
+ // The size of the file we are reading, or 'none' if it hasn't yet been
+ // read.
+ boost::optional<uint64_t> cached_file_size_;
+
+ // The fully-qualified PB type name of the messages in the container.
+ std::string pb_type_;
+
+ // Wrapped in a unique_ptr so that clients need not include PB headers.
+ std::unique_ptr<google::protobuf::FileDescriptorSet> protos_;
+
+ // Protobuf infrastructure which owns the message prototype 'prototype_'.
+ std::unique_ptr<google::protobuf::SimpleDescriptorDatabase> db_;
+ std::unique_ptr<google::protobuf::DescriptorPool> descriptor_pool_;
+ std::unique_ptr<google::protobuf::MessageFactory> message_factory_;
+ const google::protobuf::Message* prototype_ = nullptr;
+
+ std::shared_ptr<RandomAccessFile> reader_;
+};
+
+// Convenience functions for protobuf containers holding just one record.
+
+// Load a "containerized" protobuf from the given path.
+// If the file does not exist, returns Status::NotFound(). Otherwise, may
+// return other Status error codes such as Status::IOError.
+Status ReadPBContainerFromPath(Env* env, const std::string& path,
+ google::protobuf::Message* msg);
+
+// Serialize a "containerized" protobuf to the given path.
+//
+// If create == NO_OVERWRITE and 'path' already exists, the function will fail.
+// If sync == SYNC, the newly created file will be fsynced before returning.
+Status WritePBContainerToPath(Env* env, const std::string& path,
+ const google::protobuf::Message& msg,
+ CreateMode create,
+ SyncMode sync);
+
+// Wrapper for a protobuf message which lazily converts to JSON when
+// the trace buffer is dumped.
+//
+// When tracing, an instance of this class can be associated with
+// a given trace, instead of a stringified PB, thus avoiding doing
+// stringification inline and moving that work to the tracing process.
+//
+// Example usage:
+// TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+// "response", pb_util::PbTracer::TracePb(*response_pb_),
+// ...);
+//
+class PbTracer : public debug::ConvertableToTraceFormat {
+ public:
+ enum {
+ kMaxFieldLengthToTrace = 100
+ };
+
+ // Static helper to be called when adding a stringified PB to a trace.
+ // This does not actually stringify 'msg', that will be done later
+ // when/if AppendAsTraceFormat() is called on the returned object.
+ static scoped_refptr<debug::ConvertableToTraceFormat> TracePb(
+ const google::protobuf::Message& msg);
+
+ explicit PbTracer(const google::protobuf::Message& msg);
+
+ // Actually stringifies the PB and appends the string to 'out'.
+ void AppendAsTraceFormat(std::string* out) const override;
+ private:
+ const std::unique_ptr<google::protobuf::Message> msg_;
+};
+
+} // namespace pb_util
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.proto b/be/src/kudu/util/pb_util.proto
new file mode 100644
index 0000000..b78c0cf
--- /dev/null
+++ b/be/src/kudu/util/pb_util.proto
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+import "google/protobuf/descriptor.proto";
+
+// ============================================================================
+// Protobuf container metadata
+// ============================================================================
+
+// Supplemental protobuf container header, after the main header (see
+// pb_util.h for details).
+message ContainerSupHeaderPB {
+ // The protobuf schema for the messages expected in this container.
+ //
+ // This schema is complete, that is, it includes all of its dependencies
+ // (i.e. other schemas defined in .proto files imported by this schema's
+ // .proto file).
+ required google.protobuf.FileDescriptorSet protos = 1;
+
+ // The PB message type expected in each data entry in this container. Must
+ // be fully qualified (i.e. kudu.tablet.TabletSuperBlockPB).
+ required string pb_type = 2;
+}
+
+extend google.protobuf.FieldOptions {
+ optional bool REDACT = 50001 [default=false];
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util_test.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util_test.proto b/be/src/kudu/util/pb_util_test.proto
new file mode 100644
index 0000000..bac0be0
--- /dev/null
+++ b/be/src/kudu/util/pb_util_test.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+import "kudu/util/pb_util.proto";
+
+message TestSecurePrintingPB {
+ optional string insecure1 = 1;
+ optional string secure1 = 2 [(kudu.REDACT) = true];
+ optional string insecure2 = 3;
+ optional string secure2 = 4 [(kudu.REDACT) = true];
+ repeated string repeated_secure = 5 [(kudu.REDACT) = true];
+ optional string insecure3 = 6;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/process_memory-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory-test.cc b/be/src/kudu/util/process_memory-test.cc
new file mode 100644
index 0000000..36df1a9
--- /dev/null
+++ b/be/src/kudu/util/process_memory-test.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <ostream>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/process_memory.h"
+
+using std::atomic;
+using std::thread;
+using std::vector;
+
+namespace kudu {
+
+// Microbenchmark for our new/delete hooks which track process-wide
+// memory consumption.
+TEST(ProcessMemory, BenchmarkConsumptionTracking) {
+ const int kNumThreads = 200;
+ vector<thread> threads;
+ atomic<bool> done(false);
+ atomic<int64_t> total_count(0);
+
+ // We start many threads, each of which performs 10:1 ratio of
+ // new/delete pairs to consumption lookups. The high number
+ // of threads highlights when there is contention on central
+ // tcmalloc locks.
+ for (int i = 0; i < kNumThreads; i++) {
+ threads.emplace_back([&]() {
+ int64_t local_count = 0;
+ while (!done) {
+ for (int a = 0; a < 10; a++) {
+ // Mark 'x' volatile so that the compiler does not optimize out the
+ // allocation.
+ char* volatile x = new char[8000];
+ delete[] x;
+ }
+ process_memory::CurrentConsumption();
+ local_count++;
+ }
+ total_count += local_count;
+ });
+ }
+ double secs = 3;
+ SleepFor(MonoDelta::FromSeconds(secs));
+ done = true;
+
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ LOG(INFO) << "Performed " << total_count / secs << " iters/sec";
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/process_memory.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory.cc b/be/src/kudu/util/process_memory.cc
new file mode 100644
index 0000000..d2f3653
--- /dev/null
+++ b/be/src/kudu/util/process_memory.cc
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstddef>
+#include <ostream>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#ifdef TCMALLOC_ENABLED
+#include <gperftools/malloc_extension.h> // IWYU pragma: keep
+#endif
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h" // IWYU pragma: keep
+#include "kudu/util/debug/trace_event.h" // IWYU pragma: keep
+#include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mem_tracker.h" // IWYU pragma: keep
+#include "kudu/util/process_memory.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+
+DEFINE_int64(memory_limit_hard_bytes, 0,
+ "Maximum amount of memory this daemon should use, in bytes. "
+ "A value of 0 autosizes based on the total system memory. "
+ "A value of -1 disables all memory limiting.");
+TAG_FLAG(memory_limit_hard_bytes, stable);
+
+DEFINE_int32(memory_pressure_percentage, 60,
+ "Percentage of the hard memory limit that this daemon may "
+ "consume before flushing of in-memory data becomes prioritized.");
+TAG_FLAG(memory_pressure_percentage, advanced);
+
+DEFINE_int32(memory_limit_soft_percentage, 80,
+ "Percentage of the hard memory limit that this daemon may "
+ "consume before memory throttling of writes begins. The greater "
+ "the excess, the higher the chance of throttling. In general, a "
+ "lower soft limit leads to smoother write latencies but "
+ "decreased throughput, and vice versa for a higher soft limit.");
+TAG_FLAG(memory_limit_soft_percentage, advanced);
+
+DEFINE_int32(memory_limit_warn_threshold_percentage, 98,
+ "Percentage of the hard memory limit that this daemon may "
+ "consume before WARNING level messages are periodically logged.");
+TAG_FLAG(memory_limit_warn_threshold_percentage, advanced);
+
+#ifdef TCMALLOC_ENABLED
+DEFINE_int32(tcmalloc_max_free_bytes_percentage, 10,
+ "Maximum percentage of the RSS that tcmalloc is allowed to use for "
+ "reserved but unallocated memory.");
+TAG_FLAG(tcmalloc_max_free_bytes_percentage, advanced);
+#endif
+
+using strings::Substitute;
+
+namespace kudu {
+namespace process_memory {
+
+namespace {
+int64_t g_hard_limit;
+int64_t g_soft_limit;
+int64_t g_pressure_threshold;
+
+ThreadSafeRandom* g_rand = nullptr;
+
+#ifdef TCMALLOC_ENABLED
+// Total amount of memory released since the last GC. If this
+// is greater than GC_RELEASE_SIZE, this will trigger a tcmalloc gc.
+Atomic64 g_released_memory_since_gc;
+
+// Size, in bytes, that is considered a large value for Release() (or Consume() with
+// a negative value). If tcmalloc is used, this can trigger it to GC.
+// A higher value will make us call into tcmalloc less often (and therefore more
+// efficient). A lower value will mean our memory overhead is lower.
+// TODO(todd): this is a stopgap.
+const int64_t kGcReleaseSize = 128 * 1024L * 1024L;
+
+#endif // TCMALLOC_ENABLED
+
+} // anonymous namespace
+
+
+// Flag validation
+// ------------------------------------------------------------
+// Validate that various flags are percentages.
+static bool ValidatePercentage(const char* flagname, int value) {
+ if (value >= 0 && value <= 100) {
+ return true;
+ }
+ LOG(ERROR) << Substitute("$0 must be a percentage, value $1 is invalid",
+ flagname, value);
+ return false;
+}
+
+static bool dummy[] = {
+ google::RegisterFlagValidator(&FLAGS_memory_limit_soft_percentage, &ValidatePercentage),
+ google::RegisterFlagValidator(&FLAGS_memory_limit_warn_threshold_percentage, &ValidatePercentage)
+#ifdef TCMALLOC_ENABLED
+ ,google::RegisterFlagValidator(&FLAGS_tcmalloc_max_free_bytes_percentage, &ValidatePercentage)
+#endif
+};
+
+
+// Wrappers around tcmalloc functionality
+// ------------------------------------------------------------
+#ifdef TCMALLOC_ENABLED
+static int64_t GetTCMallocProperty(const char* prop) {
+ size_t value;
+ if (!MallocExtension::instance()->GetNumericProperty(prop, &value)) {
+ LOG(DFATAL) << "Failed to get tcmalloc property " << prop;
+ }
+ return value;
+}
+
+int64_t GetTCMallocCurrentAllocatedBytes() {
+ return GetTCMallocProperty("generic.current_allocated_bytes");
+}
+
+void GcTcmalloc() {
+ TRACE_EVENT0("process", "GcTcmalloc");
+
+ // Number of bytes in the 'NORMAL' free list (i.e reserved by tcmalloc but
+ // not in use).
+ int64_t bytes_overhead = GetTCMallocProperty("tcmalloc.pageheap_free_bytes");
+ // Bytes allocated by the application.
+ int64_t bytes_used = GetTCMallocCurrentAllocatedBytes();
+
+ int64_t max_overhead = bytes_used * FLAGS_tcmalloc_max_free_bytes_percentage / 100.0;
+ if (bytes_overhead > max_overhead) {
+ int64_t extra = bytes_overhead - max_overhead;
+ while (extra > 0) {
+ // Release 1MB at a time, so that tcmalloc releases its page heap lock
+ // allowing other threads to make progress. This still disrupts the current
+ // thread, but is better than disrupting all.
+ MallocExtension::instance()->ReleaseToSystem(1024 * 1024);
+ extra -= 1024 * 1024;
+ }
+ }
+}
+#endif // TCMALLOC_ENABLED
+
+
+// Consumption and soft memory limit behavior
+// ------------------------------------------------------------
+namespace {
+void DoInitLimits() {
+ int64_t limit = FLAGS_memory_limit_hard_bytes;
+ if (limit == 0) {
+ // If no limit is provided, we'll use 80% of system RAM.
+ int64_t total_ram;
+ CHECK_OK(Env::Default()->GetTotalRAMBytes(&total_ram));
+ limit = total_ram * 4;
+ limit /= 5;
+ }
+ g_hard_limit = limit;
+ g_soft_limit = FLAGS_memory_limit_soft_percentage * g_hard_limit / 100;
+ g_pressure_threshold = FLAGS_memory_pressure_percentage * g_hard_limit / 100;
+
+ g_rand = new ThreadSafeRandom(1);
+}
+
+void InitLimits() {
+ static GoogleOnceType once;
+ GoogleOnceInit(&once, &DoInitLimits);
+}
+
+} // anonymous namespace
+
+int64_t CurrentConsumption() {
+#ifdef TCMALLOC_ENABLED
+ const int64_t kReadIntervalMicros = 50000;
+ static Atomic64 last_read_time = 0;
+ static simple_spinlock read_lock;
+ static Atomic64 consumption = 0;
+ uint64_t time = GetMonoTimeMicros();
+ if (time > last_read_time + kReadIntervalMicros && read_lock.try_lock()) {
+ base::subtle::NoBarrier_Store(&consumption, GetTCMallocCurrentAllocatedBytes());
+ // Re-fetch the time after getting the consumption. This way, in case fetching
+ // consumption is extremely slow for some reason (eg due to lots of contention
+ // in tcmalloc) we at least ensure that we wait at least another full interval
+ // before fetching the information again.
+ time = GetMonoTimeMicros();
+ base::subtle::NoBarrier_Store(&last_read_time, time);
+ read_lock.unlock();
+ }
+
+ return base::subtle::NoBarrier_Load(&consumption);
+#else
+ // Without tcmalloc, we have no reliable way of determining our own heap
+ // size (e.g. mallinfo doesn't work in ASAN builds). So, we'll fall back
+ // to just looking at the sum of our tracked memory.
+ return MemTracker::GetRootTracker()->consumption();
+#endif
+}
+
+int64_t HardLimit() {
+ InitLimits();
+ return g_hard_limit;
+}
+
+int64_t SoftLimit() {
+ InitLimits();
+ return g_soft_limit;
+}
+
+int64_t MemoryPressureThreshold() {
+ InitLimits();
+ return g_pressure_threshold;
+}
+
+bool UnderMemoryPressure(double* current_capacity_pct) {
+ InitLimits();
+ int64_t consumption = CurrentConsumption();
+ if (consumption < g_pressure_threshold) {
+ return false;
+ }
+ if (current_capacity_pct) {
+ *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+ }
+ return true;
+}
+
+bool SoftLimitExceeded(double* current_capacity_pct) {
+ InitLimits();
+ int64_t consumption = CurrentConsumption();
+ // Did we exceed the actual limit?
+ if (consumption > g_hard_limit) {
+ if (current_capacity_pct) {
+ *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+ }
+ return true;
+ }
+
+ // No soft limit defined.
+ if (g_hard_limit == g_soft_limit) {
+ return false;
+ }
+
+ // Are we under the soft limit threshold?
+ if (consumption < g_soft_limit) {
+ return false;
+ }
+
+ // We're over the threshold; were we randomly chosen to be over the soft limit?
+ if (consumption + g_rand->Uniform64(g_hard_limit - g_soft_limit) > g_hard_limit) {
+ if (current_capacity_pct) {
+ *current_capacity_pct = static_cast<double>(consumption) / g_hard_limit * 100;
+ }
+ return true;
+ }
+ return false;
+}
+
+void MaybeGCAfterRelease(int64_t released_bytes) {
+#ifdef TCMALLOC_ENABLED
+ int64_t now_released = base::subtle::NoBarrier_AtomicIncrement(
+ &g_released_memory_since_gc, -released_bytes);
+ if (PREDICT_FALSE(now_released > kGcReleaseSize)) {
+ base::subtle::NoBarrier_Store(&g_released_memory_since_gc, 0);
+ GcTcmalloc();
+ }
+#endif
+}
+
+} // namespace process_memory
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/process_memory.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/process_memory.h b/be/src/kudu/util/process_memory.h
new file mode 100644
index 0000000..cba7046
--- /dev/null
+++ b/be/src/kudu/util/process_memory.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+
+namespace kudu {
+namespace process_memory {
+
+// Probabilistically returns true if the process-wide soft memory limit is exceeded.
+// The greater the excess, the higher the chance that it returns true.
+//
+// If the soft limit is exceeded and 'current_capacity_pct' is not NULL, the percentage
+// of the hard limit consumed is written to it.
+bool SoftLimitExceeded(double* current_capacity_pct);
+
+// Return true if we are under memory pressure (i.e if we are nearing the point at which
+// SoftLimitExceeded will begin to return true).
+//
+// If the process is under memory pressure, and 'current_capacity_pct' is not NULL,
+// the percentage of the hard limit consumed is written to it.
+bool UnderMemoryPressure(double* current_capacity_pct);
+
+// Potentially trigger a call to release tcmalloc memory back to the
+// OS, after the given amount of memory was released.
+void MaybeGCAfterRelease(int64_t released_bytes);
+
+// Return the total current memory consumption of the process.
+int64_t CurrentConsumption();
+
+// Return the configured hard limit for the process.
+int64_t HardLimit();
+
+// Return the configured soft limit for the process.
+int64_t SoftLimit();
+
+// Return the configured memory pressure threshold for the process.
+int64_t MemoryPressureThreshold();
+
+#ifdef TCMALLOC_ENABLED
+// Get the current amount of allocated memory, according to tcmalloc.
+//
+// This should be equal to CurrentConsumption(), but is made available so that tests
+// can verify the correctness of CurrentConsumption().
+int64_t GetTCMallocCurrentAllocatedBytes();
+#endif
+
+} // namespace process_memory
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/promise.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/promise.h b/be/src/kudu/util/promise.h
new file mode 100644
index 0000000..17f8cec
--- /dev/null
+++ b/be/src/kudu/util/promise.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PROMISE_H
+#define KUDU_UTIL_PROMISE_H
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/countdown_latch.h"
+
+namespace kudu {
+
+// A promise boxes a value which is to be provided at some time in the future.
+// A single producer calls Set(...), and any number of consumers can call Get()
+// to retrieve the produced value.
+//
+// In Guava terms, this is a SettableFuture<T>.
+template<typename T>
+class Promise {
+ public:
+ Promise() : latch_(1) {}
+ ~Promise() {}
+
+ // Reset the promise to be used again.
+ // For this to be safe, there must be some kind of external synchronization
+ // ensuring that no threads are still accessing the value from the previous
+ // incarnation of the promise.
+ void Reset() {
+ latch_.Reset(1);
+ val_ = T();
+ }
+
+ // Block until a value is available, and return a reference to it.
+ const T& Get() const {
+ latch_.Wait();
+ return val_;
+ }
+
+ // Wait for the promised value to become available with the given timeout.
+ //
+ // Returns NULL if the timeout elapses before a value is available.
+ // Otherwise returns a pointer to the value. This pointer's lifetime is
+ // tied to the lifetime of the Promise object.
+ const T* WaitFor(const MonoDelta& delta) const {
+ if (latch_.WaitFor(delta)) {
+ return &val_;
+ } else {
+ return NULL;
+ }
+ }
+
+ // Set the value of this promise.
+ // This may be called at most once.
+ void Set(const T& val) {
+ DCHECK_EQ(latch_.count(), 1) << "Already set!";
+ val_ = val;
+ latch_.CountDown();
+ }
+
+ private:
+ CountDownLatch latch_;
+ T val_;
+ DISALLOW_COPY_AND_ASSIGN(Promise);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_PROMISE_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/proto_container_test.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test.proto b/be/src/kudu/util/proto_container_test.proto
new file mode 100644
index 0000000..4707c08
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test.proto
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Arbitrary protobuf to test writing a containerized protobuf.
+message ProtoContainerTestPB {
+ required string name = 1;
+ required int32 value = 2;
+ optional string note = 3;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/proto_container_test2.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test2.proto b/be/src/kudu/util/proto_container_test2.proto
new file mode 100644
index 0000000..74a1ea3
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test2.proto
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Dependency chain:
+//
+// this file --> proto_container_test.proto
+
+import "kudu/util/proto_container_test.proto";
+
+// Arbitrary protobuf that has one PB dependency.
+message ProtoContainerTest2PB {
+ required kudu.ProtoContainerTestPB record = 1;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/proto_container_test3.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/proto_container_test3.proto b/be/src/kudu/util/proto_container_test3.proto
new file mode 100644
index 0000000..1ed1c31
--- /dev/null
+++ b/be/src/kudu/util/proto_container_test3.proto
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+// Dependency chain:
+//
+// this file --> proto_container_test.proto
+// --> proto_container_test2.proto --> proto_container_test.proto
+
+import "kudu/util/proto_container_test.proto";
+import "kudu/util/proto_container_test2.proto";
+
+// Arbitrary protobuf has two PB dependencies.
+// dependency.
+message ProtoContainerTest3PB {
+ required kudu.ProtoContainerTestPB record_one = 1;
+ required kudu.ProtoContainerTest2PB record_two = 2;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/protobuf-annotations.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protobuf-annotations.h b/be/src/kudu/util/protobuf-annotations.h
new file mode 100644
index 0000000..7fdc961
--- /dev/null
+++ b/be/src/kudu/util/protobuf-annotations.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple header which is inserted into all of our generated protobuf code.
+// We use this to hook protobuf code up to TSAN annotations.
+#ifndef KUDU_UTIL_PROTOBUF_ANNOTATIONS_H
+#define KUDU_UTIL_PROTOBUF_ANNOTATIONS_H
+
+#include "kudu/gutil/dynamic_annotations.h"
+
+// The protobuf internal headers are included before this, so we have to undefine
+// the empty definitions first.
+#undef GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN
+#undef GOOGLE_SAFE_CONCURRENT_WRITES_END
+
+#define GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN ANNOTATE_IGNORE_WRITES_BEGIN
+#define GOOGLE_SAFE_CONCURRENT_WRITES_END ANNOTATE_IGNORE_WRITES_END
+
+#endif /* KUDU_UTIL_PROTOBUF_ANNOTATIONS_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/protobuf_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protobuf_util.h b/be/src/kudu/util/protobuf_util.h
new file mode 100644
index 0000000..cc88eda
--- /dev/null
+++ b/be/src/kudu/util/protobuf_util.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PROTOBUF_UTIL_H
+#define KUDU_UTIL_PROTOBUF_UTIL_H
+
+#include <google/protobuf/message_lite.h>
+
+namespace kudu {
+
+bool AppendPBToString(const google::protobuf::MessageLite &msg, faststring *output) {
+ int old_size = output->size();
+ int byte_size = msg.ByteSize();
+ output->resize(old_size + byte_size);
+ uint8* start = reinterpret_cast<uint8*>(output->data() + old_size);
+ uint8* end = msg.SerializeWithCachedSizesToArray(start);
+ CHECK(end - start == byte_size)
+ << "Error in serialization. byte_size=" << byte_size
+ << " new ByteSize()=" << msg.ByteSize()
+ << " end-start=" << (end-start);
+ return true;
+}
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/protoc-gen-insertions.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/protoc-gen-insertions.cc b/be/src/kudu/util/protoc-gen-insertions.cc
new file mode 100644
index 0000000..5d1097e
--- /dev/null
+++ b/be/src/kudu/util/protoc-gen-insertions.cc
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple protoc plugin which inserts some code at the top of each generated protobuf.
+// Currently, this just adds an include of protobuf-annotations.h, a file which hooks up
+// the protobuf concurrency annotations to our TSAN annotations.
+
+#include <string>
+
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using google::protobuf::io::ZeroCopyOutputStream;
+using google::protobuf::io::Printer;
+using std::string;
+
+namespace kudu {
+
+static const char* const kIncludeToInsert = "#include \"kudu/util/protobuf-annotations.h\"\n";
+static const char* const kProtoExtension = ".proto";
+
+class InsertAnnotations : public ::google::protobuf::compiler::CodeGenerator {
+ virtual bool Generate(const google::protobuf::FileDescriptor *file,
+ const std::string &/*param*/,
+ google::protobuf::compiler::GeneratorContext *gen_context,
+ std::string *error) const OVERRIDE {
+
+ // Determine the file name we will substitute into.
+ string path_no_extension;
+ if (!TryStripSuffixString(file->name(), kProtoExtension, &path_no_extension)) {
+ *error = strings::Substitute("file name $0 did not end in $1", file->name(), kProtoExtension);
+ return false;
+ }
+ string pb_file = path_no_extension + ".pb.cc";
+
+ // Actually insert the new #include
+ gscoped_ptr<ZeroCopyOutputStream> inserter(gen_context->OpenForInsert(pb_file, "includes"));
+ Printer printer(inserter.get(), '$');
+ printer.Print(kIncludeToInsert);
+
+ if (printer.failed()) {
+ *error = "Failed to print to output file";
+ return false;
+ }
+
+ return true;
+ }
+};
+
+} // namespace kudu
+
+int main(int argc, char *argv[]) {
+ kudu::InsertAnnotations generator;
+ return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pstack_watcher-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher-test.cc b/be/src/kudu/util/pstack_watcher-test.cc
new file mode 100644
index 0000000..a993d66
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher-test.cc
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/pstack_watcher.h"
+
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstdio>
+#include <memory>
+#include <string>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using std::shared_ptr;
+using std::string;
+using strings::Substitute;
+
+namespace kudu {
+
+TEST(TestPstackWatcher, TestPstackWatcherCancellation) {
+ PstackWatcher watcher(MonoDelta::FromSeconds(1000000));
+ watcher.Shutdown();
+}
+
+TEST(TestPstackWatcher, TestWait) {
+ PstackWatcher watcher(MonoDelta::FromMilliseconds(10));
+ watcher.Wait();
+}
+
+TEST(TestPstackWatcher, TestDumpStacks) {
+ ASSERT_OK(PstackWatcher::DumpStacks());
+}
+
+static FILE* RedirectStdout(string *temp_path) {
+ string temp_dir;
+ CHECK_OK(Env::Default()->GetTestDirectory(&temp_dir));
+ *temp_path = Substitute("$0/pstack_watcher-dump.$1.txt",
+ temp_dir, getpid());
+ FILE* reopened;
+ POINTER_RETRY_ON_EINTR(reopened, freopen(temp_path->c_str(), "w", stdout));
+ return reopened;
+}
+
+TEST(TestPstackWatcher, TestPstackWatcherRunning) {
+ string stdout_file;
+ int old_stdout;
+ RETRY_ON_EINTR(old_stdout, dup(STDOUT_FILENO));
+ CHECK_ERR(old_stdout);
+ {
+ FILE* out_fp = RedirectStdout(&stdout_file);
+ PCHECK(out_fp != nullptr);
+ SCOPED_CLEANUP({
+ int err;
+ RETRY_ON_EINTR(err, fclose(out_fp));
+ });
+ PstackWatcher watcher(MonoDelta::FromMilliseconds(500));
+ while (watcher.IsRunning()) {
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
+ }
+ int dup2_ret;
+ RETRY_ON_EINTR(dup2_ret, dup2(old_stdout, STDOUT_FILENO));
+ CHECK_ERR(dup2_ret);
+ PCHECK(stdout = fdopen(STDOUT_FILENO, "w"));
+
+ faststring contents;
+ CHECK_OK(ReadFileToString(Env::Default(), stdout_file, &contents));
+ ASSERT_STR_CONTAINS(contents.ToString(), "BEGIN STACKS");
+ CHECK_ERR(unlink(stdout_file.c_str()));
+ ASSERT_GE(fprintf(stdout, "%s\n", contents.ToString().c_str()), 0)
+ << "errno=" << errno << ": " << ErrnoToString(errno);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pstack_watcher.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher.cc b/be/src/kudu/util/pstack_watcher.cc
new file mode 100644
index 0000000..2c4481a
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/pstack_watcher.h"
+
+#include <unistd.h>
+
+#include <cerrno>
+#include <cstdio>
+#include <string>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/env.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/status.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+using std::string;
+using std::vector;
+using strings::SkipEmpty;
+using strings::SkipWhitespace;
+using strings::Split;
+using strings::Substitute;
+
+PstackWatcher::PstackWatcher(MonoDelta timeout)
+ : timeout_(timeout), running_(true), cond_(&lock_) {
+ CHECK_OK(Thread::Create("pstack_watcher", "pstack_watcher",
+ boost::bind(&PstackWatcher::Run, this), &thread_));
+}
+
+PstackWatcher::~PstackWatcher() {
+ Shutdown();
+}
+
+void PstackWatcher::Shutdown() {
+ {
+ MutexLock guard(lock_);
+ running_ = false;
+ cond_.Broadcast();
+ }
+ if (thread_) {
+ CHECK_OK(ThreadJoiner(thread_.get()).Join());
+ thread_.reset();
+ }
+}
+
+bool PstackWatcher::IsRunning() const {
+ MutexLock guard(lock_);
+ return running_;
+}
+
+void PstackWatcher::Wait() const {
+ MutexLock lock(lock_);
+ while (running_) {
+ cond_.Wait();
+ }
+}
+
+void PstackWatcher::Run() {
+ MutexLock guard(lock_);
+ if (!running_) return;
+ cond_.WaitFor(timeout_);
+ if (!running_) return;
+
+ WARN_NOT_OK(DumpStacks(DUMP_FULL), "Unable to print pstack from watcher");
+ running_ = false;
+ cond_.Broadcast();
+}
+
+Status PstackWatcher::HasProgram(const char* progname) {
+ Subprocess proc({ "which", progname } );
+ proc.DisableStderr();
+ proc.DisableStdout();
+ RETURN_NOT_OK_PREPEND(proc.Start(),
+ Substitute("HasProgram($0): error running 'which'", progname));
+ RETURN_NOT_OK(proc.Wait());
+ int exit_status;
+ string exit_info;
+ RETURN_NOT_OK(proc.GetExitStatus(&exit_status, &exit_info));
+ if (exit_status == 0) {
+ return Status::OK();
+ }
+ return Status::NotFound(Substitute("can't find $0: $1", progname, exit_info));
+}
+
+Status PstackWatcher::HasGoodGdb() {
+ // Check for the existence of gdb.
+ RETURN_NOT_OK(HasProgram("gdb"));
+
+ // gdb exists, run it and parse the output of --version. For example:
+ //
+ // GNU gdb (GDB) Red Hat Enterprise Linux (7.2-75.el6)
+ // ...
+ //
+ // Or:
+ //
+ // GNU gdb (Ubuntu 7.11.1-0ubuntu1~16.5) 7.11.1
+ // ...
+ string stdout;
+ RETURN_NOT_OK(Subprocess::Call({"gdb", "--version"}, "", &stdout));
+ vector<string> lines = Split(stdout, "\n", SkipEmpty());
+ if (lines.empty()) {
+ return Status::Incomplete("gdb version not found");
+ }
+ vector<string> words = Split(lines[0], " ", SkipWhitespace());
+ if (words.empty()) {
+ return Status::Incomplete("could not parse gdb version");
+ }
+ string version = words[words.size() - 1];
+ version = StripPrefixString(version, "(");
+ version = StripSuffixString(version, ")");
+
+ // The variable pretty print routine in older versions of gdb is buggy in
+ // that it reads the values of all local variables, including uninitialized
+ // ones. For some variable types with an embedded length (such as std::string
+ // or std::vector), this can lead to all sorts of incorrect memory accesses,
+ // causing deadlocks or seemingly infinite loops within gdb.
+ //
+ // It's not clear exactly when this behavior was fixed, so we whitelist the
+ // oldest known good version: the one found in Ubuntu 14.04.
+ //
+ // See the following gdb bug reports for more information:
+ // - https://sourceware.org/bugzilla/show_bug.cgi?id=11868
+ // - https://sourceware.org/bugzilla/show_bug.cgi?id=12127
+ // - https://sourceware.org/bugzilla/show_bug.cgi?id=16196
+ // - https://sourceware.org/bugzilla/show_bug.cgi?id=16286
+ autodigit_less lt;
+ if (lt(version, "7.7")) {
+ return Status::NotSupported("gdb version too old", version);
+ }
+
+ return Status::OK();
+}
+
+Status PstackWatcher::DumpStacks(int flags) {
+ return DumpPidStacks(getpid(), flags);
+}
+
+Status PstackWatcher::DumpPidStacks(pid_t pid, int flags) {
+
+ // Prefer GDB if available; it gives us line numbers and thread names.
+ Status s = HasGoodGdb();
+ if (s.ok()) {
+ return RunGdbStackDump(pid, flags);
+ }
+ WARN_NOT_OK(s, "gdb not available");
+
+ // Otherwise, try to use pstack or gstack.
+ for (const auto& p : { "pstack", "gstack" }) {
+ s = HasProgram(p);
+ if (s.ok()) {
+ return RunPstack(p, pid);
+ }
+ WARN_NOT_OK(s, Substitute("$0 not available", p));
+ }
+
+ return Status::ServiceUnavailable("Neither gdb, pstack, nor gstack appear to be installed.");
+}
+
+Status PstackWatcher::RunGdbStackDump(pid_t pid, int flags) {
+ // Command: gdb -quiet -batch -nx -ex cmd1 -ex cmd2 /proc/$PID/exe $PID
+ vector<string> argv;
+ argv.emplace_back("gdb");
+ // Don't print introductory version/copyright messages.
+ argv.emplace_back("-quiet");
+ // Exit after processing all of the commands below.
+ argv.emplace_back("-batch");
+ // Don't run commands from .gdbinit
+ argv.emplace_back("-nx");
+ argv.emplace_back("-ex");
+ argv.emplace_back("set print pretty on");
+ argv.emplace_back("-ex");
+ argv.emplace_back("info threads");
+ argv.emplace_back("-ex");
+ argv.emplace_back("thread apply all bt");
+ if (flags & DUMP_FULL) {
+ argv.emplace_back("-ex");
+ argv.emplace_back("thread apply all bt full");
+ }
+ string executable;
+ Env* env = Env::Default();
+ RETURN_NOT_OK(env->GetExecutablePath(&executable));
+ argv.push_back(executable);
+ argv.push_back(Substitute("$0", pid));
+ return RunStackDump(argv);
+}
+
+Status PstackWatcher::RunPstack(const std::string& progname, pid_t pid) {
+ string pid_string(Substitute("$0", pid));
+ vector<string> argv;
+ argv.push_back(progname);
+ argv.push_back(pid_string);
+ return RunStackDump(argv);
+}
+
+Status PstackWatcher::RunStackDump(const vector<string>& argv) {
+ printf("************************ BEGIN STACKS **************************\n");
+ if (fflush(stdout) == EOF) {
+ return Status::IOError("Unable to flush stdout", ErrnoToString(errno), errno);
+ }
+ Subprocess pstack_proc(argv);
+ RETURN_NOT_OK_PREPEND(pstack_proc.Start(), "RunStackDump proc.Start() failed");
+ int ret;
+ RETRY_ON_EINTR(ret, ::close(pstack_proc.ReleaseChildStdinFd()));
+ if (ret == -1) {
+ return Status::IOError("Unable to close child stdin", ErrnoToString(errno), errno);
+ }
+ RETURN_NOT_OK_PREPEND(pstack_proc.Wait(), "RunStackDump proc.Wait() failed");
+ int exit_code;
+ string exit_info;
+ RETURN_NOT_OK_PREPEND(pstack_proc.GetExitStatus(&exit_code, &exit_info),
+ "RunStackDump proc.GetExitStatus() failed");
+ if (exit_code != 0) {
+ return Status::RuntimeError("RunStackDump proc.Wait() error", exit_info);
+ }
+ printf("************************* END STACKS ***************************\n");
+ if (fflush(stdout) == EOF) {
+ return Status::IOError("Unable to flush stdout", ErrnoToString(errno), errno);
+ }
+
+ return Status::OK();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pstack_watcher.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pstack_watcher.h b/be/src/kudu/util/pstack_watcher.h
new file mode 100644
index 0000000..882e6d2
--- /dev/null
+++ b/be/src/kudu/util/pstack_watcher.h
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_PSTACK_WATCHER_H
+#define KUDU_UTIL_PSTACK_WATCHER_H
+
+#include <sys/types.h>
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Thread;
+
+// PstackWatcher is an object which will pstack the current process and print
+// the results to stdout. It does this after a certain timeout has occured.
+class PstackWatcher {
+ public:
+
+ enum Flags {
+ NO_FLAGS = 0,
+
+ // Run 'thread apply all bt full', which is very verbose output
+ DUMP_FULL = 1
+ };
+
+ // Static method to collect and write stack dump output to stdout of the current
+ // process.
+ static Status DumpStacks(int flags = NO_FLAGS);
+
+ // Like the above but for any process, not just the current one.
+ static Status DumpPidStacks(pid_t pid, int flags = NO_FLAGS);
+
+ // Instantiate a watcher that writes a pstack to stdout after the given
+ // timeout expires.
+ explicit PstackWatcher(MonoDelta timeout);
+
+ ~PstackWatcher();
+
+ // Shut down the watcher and do not log a pstack.
+ // This method is not thread-safe.
+ void Shutdown();
+
+ // Test whether the watcher is still running or has shut down.
+ // Thread-safe.
+ bool IsRunning() const;
+
+ // Wait until the timeout expires and the watcher logs a pstack.
+ // Thread-safe.
+ void Wait() const;
+
+ private:
+ // Test for the existence of the given program in the system path.
+ static Status HasProgram(const char* progname);
+
+ // Check whether the system path has 'gdb' and whether it is modern enough
+ // for safe stack dump usage.
+ static Status HasGoodGdb();
+
+ // Get a stack dump using GDB directly.
+ static Status RunGdbStackDump(pid_t pid, int flags);
+
+ // Get a stack dump using the pstack or gstack program.
+ static Status RunPstack(const std::string& progname, pid_t pid);
+
+ // Invoke and wait for the stack dump program.
+ static Status RunStackDump(const std::vector<std::string>& argv);
+
+ // Run the thread that waits for the specified duration before logging a
+ // pstack.
+ void Run();
+
+ const MonoDelta timeout_;
+ bool running_;
+ scoped_refptr<Thread> thread_;
+ mutable Mutex lock_;
+ mutable ConditionVariable cond_;
+};
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random-test.cc b/be/src/kudu/util/random-test.cc
new file mode 100644
index 0000000..26c7ab5
--- /dev/null
+++ b/be/src/kudu/util/random-test.cc
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cmath>
+#include <cstdint>
+#include <limits>
+#include <unordered_set>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
+
+using std::numeric_limits;
+using std::unordered_set;
+using std::vector;
+
+namespace kudu {
+
+class RandomTest : public KuduTest {
+ public:
+ RandomTest()
+ : rng_(SeedRandom()) {
+ }
+
+ protected:
+ Random rng_;
+};
+
+// Tests that after a certain number of invocations of Normal(), the
+// actual mean of all samples is within the specified standard
+// deviation of the target mean.
+TEST_F(RandomTest, TestNormalDist) {
+ const double kMean = 5.0;
+ const double kStdDev = 0.01;
+ const int kNumIters = 100000;
+
+ double sum = 0.0;
+ for (int i = 0; i < kNumIters; ++i) {
+ sum += rng_.Normal(kMean, kStdDev);
+ }
+
+ ASSERT_LE(fabs((sum / static_cast<double>(kNumIters)) - kMean), kStdDev);
+}
+
+// Tests that after a large number of invocations of Next32() and Next64(), we
+// have flipped all the bits we claim we should have.
+//
+// This is a regression test for a bug where we were incorrectly bit-shifting
+// in Next64().
+//
+// Note: Our RNG actually only generates 31 bits of randomness for 32 bit
+// integers. If all bits need to be randomized, callers must use Random::Next64().
+// This test reflects that, and if we change the RNG algo this test should also change.
+TEST_F(RandomTest, TestUseOfBits) {
+ // For Next32():
+ uint32_t ones32 = numeric_limits<uint32_t>::max();
+ uint32_t zeroes32 = 0;
+ // For Next64():
+ uint64_t ones64 = numeric_limits<uint64_t>::max();
+ uint64_t zeroes64 = 0;
+
+ for (int i = 0; i < 10000000; i++) {
+ uint32_t r32 = rng_.Next32();
+ ones32 &= r32;
+ zeroes32 |= r32;
+
+ uint64_t r64 = rng_.Next64();
+ ones64 &= r64;
+ zeroes64 |= r64;
+ }
+
+ // At the end, we should have flipped 31 and 64 bits, respectively. One
+ // detail of the current RNG impl is that Next32() always returns a number
+ // with MSB set to 0.
+ uint32_t expected_bits_31 = numeric_limits<uint32_t>::max() >> 1;
+ uint64_t expected_bits_64 = numeric_limits<uint64_t>::max();
+
+ ASSERT_EQ(0, ones32);
+ ASSERT_EQ(expected_bits_31, zeroes32);
+ ASSERT_EQ(0, ones64);
+ ASSERT_EQ(expected_bits_64, zeroes64);
+}
+
+TEST_F(RandomTest, TestResetSeed) {
+ rng_.Reset(1);
+ uint64_t first = rng_.Next64();
+ rng_.Reset(1);
+ uint64_t second = rng_.Next64();
+ ASSERT_EQ(first, second);
+}
+
+TEST_F(RandomTest, TestReservoirSample) {
+ // Use a constant seed to avoid flakiness.
+ rng_.Reset(12345);
+
+ vector<int> population;
+ for (int i = 0; i < 100; i++) {
+ population.push_back(i);
+ }
+
+ // Run 1000 trials selecting 5 elements.
+ vector<int> results;
+ vector<int> counts(population.size());
+ unordered_set<int> avoid;
+ for (int trial = 0; trial < 1000; trial++) {
+ rng_.ReservoirSample(population, 5, avoid, &results);
+ for (int result : results) {
+ counts[result]++;
+ }
+ }
+
+ // We expect each element to be selected
+ // 50 times on average, but since it's random, it won't be exact.
+ // However, since we use a constant seed, this test won't be flaky.
+ for (int count : counts) {
+ ASSERT_GE(count, 25);
+ ASSERT_LE(count, 75);
+ }
+
+ // Run again, but avoid some particular entries.
+ avoid.insert(3);
+ avoid.insert(10);
+ avoid.insert(20);
+ counts.assign(100, 0);
+ for (int trial = 0; trial < 1000; trial++) {
+ rng_.ReservoirSample(population, 5, avoid, &results);
+ for (int result : results) {
+ counts[result]++;
+ }
+ }
+
+ // Ensure that we didn't ever pick the avoided elements.
+ ASSERT_EQ(0, counts[3]);
+ ASSERT_EQ(0, counts[10]);
+ ASSERT_EQ(0, counts[20]);
+}
+
+TEST_F(RandomTest, TestReservoirSamplePopulationTooSmall) {
+ vector<int> population;
+ for (int i = 0; i < 10; i++) {
+ population.push_back(i);
+ }
+
+ vector<int> results;
+ unordered_set<int> avoid;
+ rng_.ReservoirSample(population, 20, avoid, &results);
+ ASSERT_EQ(population.size(), results.size());
+ ASSERT_EQ(population, results);
+
+ rng_.ReservoirSample(population, 10, avoid, &results);
+ ASSERT_EQ(population.size(), results.size());
+ ASSERT_EQ(population, results);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random.h b/be/src/kudu/util/random.h
new file mode 100644
index 0000000..e31e475
--- /dev/null
+++ b/be/src/kudu/util/random.h
@@ -0,0 +1,252 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#ifndef KUDU_UTIL_RANDOM_H_
+#define KUDU_UTIL_RANDOM_H_
+
+#include <cmath>
+#include <cstdint>
+#include <mutex>
+#include <random>
+#include <vector>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+namespace random_internal {
+
+static const uint32_t M = 2147483647L; // 2^31-1
+
+} // namespace random_internal
+
+template<class R>
+class StdUniformRNG;
+
+// A very simple random number generator. Not especially good at
+// generating truly random bits, but good enough for our needs in this
+// package. This implementation is not thread-safe.
+class Random {
+ private:
+ uint32_t seed_;
+ public:
+ explicit Random(uint32_t s) {
+ Reset(s);
+ }
+
+ // Reset the RNG to the given seed value.
+ void Reset(uint32_t s) {
+ seed_ = s & 0x7fffffffu;
+ // Avoid bad seeds.
+ if (seed_ == 0 || seed_ == random_internal::M) {
+ seed_ = 1;
+ }
+ }
+
+ // Next pseudo-random 32-bit unsigned integer.
+ // FIXME: This currently only generates 31 bits of randomness.
+ // The MSB will always be zero.
+ uint32_t Next() {
+ static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0
+ // We are computing
+ // seed_ = (seed_ * A) % M, where M = 2^31-1
+ //
+ // seed_ must not be zero or M, or else all subsequent computed values
+ // will be zero or M respectively. For all other values, seed_ will end
+ // up cycling through every number in [1,M-1]
+ uint64_t product = seed_ * A;
+
+ // Compute (product % M) using the fact that ((x << 31) % M) == x.
+ seed_ = static_cast<uint32_t>((product >> 31) + (product & random_internal::M));
+ // The first reduction may overflow by 1 bit, so we may need to
+ // repeat. mod == M is not possible; using > allows the faster
+ // sign-bit-based test.
+ if (seed_ > random_internal::M) {
+ seed_ -= random_internal::M;
+ }
+ return seed_;
+ }
+
+ // Alias for consistency with Next64
+ uint32_t Next32() { return Next(); }
+
+ // Next pseudo-random 64-bit unsigned integer.
+ uint64_t Next64() {
+ uint64_t large = Next();
+ large <<= 31;
+ large |= Next();
+ // Fill in the highest two MSBs.
+ large |= implicit_cast<uint64_t>(Next32()) << 62;
+ return large;
+ }
+
+ // Returns a uniformly distributed value in the range [0..n-1]
+ // REQUIRES: n > 0
+ uint32_t Uniform(uint32_t n) { return Next() % n; }
+
+ // Alias for consistency with Uniform64
+ uint32_t Uniform32(uint32_t n) { return Uniform(n); }
+
+ // Returns a uniformly distributed 64-bit value in the range [0..n-1]
+ // REQUIRES: n > 0
+ uint64_t Uniform64(uint64_t n) { return Next64() % n; }
+
+ // Randomly returns true ~"1/n" of the time, and false otherwise.
+ // REQUIRES: n > 0
+ bool OneIn(int n) { return (Next() % n) == 0; }
+
+ // Skewed: pick "base" uniformly from range [0,max_log] and then
+ // return "base" random bits. The effect is to pick a number in the
+ // range [0,2^max_log-1] with exponential bias towards smaller numbers.
+ uint32_t Skewed(int max_log) {
+ return Uniform(1 << Uniform(max_log + 1));
+ }
+
+ // Samples a random number from the given normal distribution.
+ double Normal(double mean, double std_dev);
+
+ // Return a random number between 0.0 and 1.0 inclusive.
+ double NextDoubleFraction() {
+ return Next() / static_cast<double>(random_internal::M + 1.0);
+ }
+
+ // Sample 'k' random elements from the collection 'c' into 'result', taking care not to sample any
+ // elements that are already present in 'avoid'.
+ //
+ // In the case that 'c' has fewer than 'k' elements then all elements in 'c' will be selected.
+ //
+ // 'c' should be an iterable STL collection such as a vector, set, or list.
+ // 'avoid' should be an STL-compatible set.
+ //
+ // The results are not stored in a randomized order: the order of results will
+ // match their order in the input collection.
+ template<class Collection, class Set, class T>
+ void ReservoirSample(const Collection& c, int k, const Set& avoid,
+ std::vector<T>* result) {
+ result->clear();
+ result->reserve(k);
+ int i = 0;
+ for (const T& elem : c) {
+ if (ContainsKey(avoid, elem)) {
+ continue;
+ }
+ i++;
+ // Fill the reservoir if there is available space.
+ if (result->size() < k) {
+ result->push_back(elem);
+ continue;
+ }
+ // Otherwise replace existing elements with decreasing probability.
+ int j = Uniform(i);
+ if (j < k) {
+ (*result)[j] = elem;
+ }
+ }
+ }
+};
+
+// Thread-safe wrapper around Random.
+class ThreadSafeRandom {
+ public:
+ explicit ThreadSafeRandom(uint32_t s)
+ : random_(s) {
+ }
+
+ void Reset(uint32_t s) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ random_.Reset(s);
+ }
+
+ uint32_t Next() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.Next();
+ }
+
+ uint32_t Next32() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.Next32();
+ }
+
+ uint64_t Next64() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.Next64();
+ }
+
+ uint32_t Uniform(uint32_t n) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.Uniform(n);
+ }
+
+ uint32_t Uniform32(uint32_t n) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.Uniform32(n);
+ }
+
+ uint64_t Uniform64(uint64_t n) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.Uniform64(n);
+ }
+
+ bool OneIn(int n) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.OneIn(n);
+ }
+
+ uint32_t Skewed(int max_log) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.Skewed(max_log);
+ }
+
+ double Normal(double mean, double std_dev) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.Normal(mean, std_dev);
+ }
+
+ double NextDoubleFraction() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return random_.NextDoubleFraction();
+ }
+
+ template<class Collection, class Set, class T>
+ void ReservoirSample(const Collection& c, int k, const Set& avoid,
+ std::vector<T>* result) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ random_.ReservoirSample(c, k, avoid, result);
+ }
+
+ private:
+ simple_spinlock lock_;
+ Random random_;
+};
+
+// Wraps either Random or ThreadSafeRandom as a C++ standard library
+// compliant UniformRandomNumberGenerator:
+// http://en.cppreference.com/w/cpp/concept/UniformRandomNumberGenerator
+template<class R>
+class StdUniformRNG {
+ public:
+ typedef uint32_t result_type;
+
+ explicit StdUniformRNG(R* r) : r_(r) {}
+ uint32_t operator()() {
+ return r_->Next32();
+ }
+ constexpr static uint32_t min() { return 0; }
+ constexpr static uint32_t max() { return (1L << 31) - 1; }
+
+ private:
+ R* r_;
+};
+
+// Defined outside the class to make use of StdUniformRNG above.
+inline double Random::Normal(double mean, double std_dev) {
+ std::normal_distribution<> nd(mean, std_dev);
+ StdUniformRNG<Random> gen(this);
+ return nd(gen);
+}
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_RANDOM_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util-test.cc b/be/src/kudu/util/random_util-test.cc
new file mode 100644
index 0000000..993ef15
--- /dev/null
+++ b/be/src/kudu/util/random_util-test.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/random_util.h"
+
+#include <cstring>
+#include <ostream>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class RandomUtilTest : public KuduTest {
+ protected:
+ RandomUtilTest() : rng_(SeedRandom()) {}
+
+ Random rng_;
+
+ static const int kLenMax = 100;
+ static const int kNumTrials = 100;
+};
+
+namespace {
+
+// Checks string defined at start is set to \0 everywhere but [from, to)
+void CheckEmpty(char* start, int from, int to, int stop) {
+ DCHECK_LE(0, from);
+ DCHECK_LE(from, to);
+ DCHECK_LE(to, stop);
+ for (int j = 0; (j == from ? j = to : j) < stop; ++j) {
+ CHECK_EQ(start[j], '\0') << "Index " << j << " not null after defining"
+ << "indices [" << from << "," << to << ") of "
+ << "a nulled string [0," << stop << ").";
+ }
+}
+
+} // anonymous namespace
+
+// Makes sure that RandomString only writes the specified amount
+TEST_F(RandomUtilTest, TestRandomString) {
+ char start[kLenMax];
+
+ for (int i = 0; i < kNumTrials; ++i) {
+ memset(start, '\0', kLenMax);
+ int to = rng_.Uniform(kLenMax + 1);
+ int from = rng_.Uniform(to + 1);
+ RandomString(start + from, to - from, &rng_);
+ CheckEmpty(start, from, to, kLenMax);
+ }
+
+ // Corner case
+ memset(start, '\0', kLenMax);
+ RandomString(start, 0, &rng_);
+ CheckEmpty(start, 0, 0, kLenMax);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/random_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/random_util.cc b/be/src/kudu/util/random_util.cc
new file mode 100644
index 0000000..fa7ef12
--- /dev/null
+++ b/be/src/kudu/util/random_util.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/random_util.h"
+
+#include <unistd.h>
+
+#include <cstdlib>
+#include <cstring>
+#include <string>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/random.h"
+
+using std::string;
+
+namespace kudu {
+
+void RandomString(void* dest, size_t n, Random* rng) {
+ size_t i = 0;
+ uint32_t random = rng->Next();
+ char* cdest = static_cast<char*>(dest);
+ static const size_t sz = sizeof(random);
+ if (n >= sz) {
+ for (i = 0; i <= n - sz; i += sz) {
+ memcpy(&cdest[i], &random, sizeof(random));
+ random = rng->Next();
+ }
+ }
+ memcpy(cdest + i, &random, n - i);
+}
+
+string RandomString(size_t n, Random* rng) {
+ faststring s;
+ s.resize(n);
+ RandomString(s.data(), n, rng);
+ return s.ToString();
+}
+
+ATTRIBUTE_NO_SANITIZE_INTEGER
+uint32_t GetRandomSeed32() {
+ uint32_t seed = static_cast<uint32_t>(GetCurrentTimeMicros());
+ seed *= getpid();
+ seed *= Env::Default()->gettid();
+ return seed;
+}
+
+} // namespace kudu