You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:42 UTC
[30/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/x509_check_host.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/x509_check_host.cc b/be/src/kudu/security/x509_check_host.cc
new file mode 100644
index 0000000..4f54ca1
--- /dev/null
+++ b/be/src/kudu/security/x509_check_host.cc
@@ -0,0 +1,439 @@
+/*
+ * Copyright 1999-2016 The OpenSSL Project Authors. All Rights Reserved.
+ *
+ * Licensed under the OpenSSL license (the "License"). You may not use
+ * this file except in compliance with the License. You can obtain a copy
+ * in the file LICENSE in the source distribution or at
+ * https://www.openssl.org/source/license.html
+ */
+
+// The following is ported from the OpenSSL-1.1.0b library. The implementations
+// of the functions are for the most part the same except where mentioned in special
+// comments. Explicit casts were also added to bypass compilation errors.
+
+#include <string.h>
+
+#include <openssl/asn1.h>
+#include <openssl/crypto.h>
+#include <openssl/obj_mac.h>
+#include <openssl/x509.h>
+#include <openssl/x509v3.h>
+
+#include "kudu/security/x509_check_host.h"
+
+// Ported from include/openssl/crypto.h from OpenSSL-1.1.0b
+// Modifed to use __FILE__ and __LINE__ instead of OPENSSL_FILE and OPENSSL_LINE.
+# define OPENSSL_strndup(str, n) \
+ CRYPTO_strndup(str, n, __FILE__, __LINE__)
+
+// Ported from crypto/o_str.c from OpenSSL-1.1.0b.
+// Modified to use strnlen() instead of OPENSSL_strnlen()
+char *CRYPTO_strndup(const char *str, size_t s, const char* file, int line)
+{
+ size_t maxlen;
+ char *ret;
+
+ if (str == NULL)
+ return NULL;
+
+ maxlen = strnlen(str, s);
+
+ ret = (char*)CRYPTO_malloc(maxlen + 1, file, line);
+ if (ret) {
+ memcpy(ret, str, maxlen);
+ ret[maxlen] = '\0';
+ }
+ return ret;
+}
+
+// The remaining code is ported form crypto/x509v3/v3_utl.c
+
+typedef int (*equal_fn) (const unsigned char *pattern, size_t pattern_len,
+ const unsigned char *subject, size_t subject_len,
+ unsigned int flags);
+
+/* Skip pattern prefix to match "wildcard" subject */
+static void skip_prefix(const unsigned char **p, size_t *plen,
+ size_t subject_len,
+ unsigned int flags)
+{
+ const unsigned char *pattern = *p;
+ size_t pattern_len = *plen;
+
+ /*
+ * If subject starts with a leading '.' followed by more octets, and
+ * pattern is longer, compare just an equal-length suffix with the
+ * full subject (starting at the '.'), provided the prefix contains
+ * no NULs.
+ */
+ if ((flags & _X509_CHECK_FLAG_DOT_SUBDOMAINS) == 0)
+ return;
+
+ while (pattern_len > subject_len && *pattern) {
+ if ((flags & X509_CHECK_FLAG_SINGLE_LABEL_SUBDOMAINS) &&
+ *pattern == '.')
+ break;
+ ++pattern;
+ --pattern_len;
+ }
+
+ /* Skip if entire prefix acceptable */
+ if (pattern_len == subject_len) {
+ *p = pattern;
+ *plen = pattern_len;
+ }
+}
+
+/* Compare while ASCII ignoring case. */
+static int equal_nocase(const unsigned char *pattern, size_t pattern_len,
+ const unsigned char *subject, size_t subject_len,
+ unsigned int flags)
+{
+ skip_prefix(&pattern, &pattern_len, subject_len, flags);
+ if (pattern_len != subject_len)
+ return 0;
+ while (pattern_len) {
+ unsigned char l = *pattern;
+ unsigned char r = *subject;
+ /* The pattern must not contain NUL characters. */
+ if (l == 0)
+ return 0;
+ if (l != r) {
+ if ('A' <= l && l <= 'Z')
+ l = (l - 'A') + 'a';
+ if ('A' <= r && r <= 'Z')
+ r = (r - 'A') + 'a';
+ if (l != r)
+ return 0;
+ }
+ ++pattern;
+ ++subject;
+ --pattern_len;
+ }
+ return 1;
+}
+
+/* Compare using memcmp. */
+static int equal_case(const unsigned char *pattern, size_t pattern_len,
+ const unsigned char *subject, size_t subject_len,
+ unsigned int flags)
+{
+ skip_prefix(&pattern, &pattern_len, subject_len, flags);
+ if (pattern_len != subject_len)
+ return 0;
+ return !memcmp(pattern, subject, pattern_len);
+}
+
+/*
+ * RFC 5280, section 7.5, requires that only the domain is compared in a
+ * case-insensitive manner.
+ */
+static int equal_email(const unsigned char *a, size_t a_len,
+ const unsigned char *b, size_t b_len,
+ unsigned int unused_flags)
+{
+ size_t i = a_len;
+ if (a_len != b_len)
+ return 0;
+ /*
+ * We search backwards for the '@' character, so that we do not have to
+ * deal with quoted local-parts. The domain part is compared in a
+ * case-insensitive manner.
+ */
+ while (i > 0) {
+ --i;
+ if (a[i] == '@' || b[i] == '@') {
+ if (!equal_nocase(a + i, a_len - i, b + i, a_len - i, 0))
+ return 0;
+ break;
+ }
+ }
+ if (i == 0)
+ i = a_len;
+ return equal_case(a, i, b, i, 0);
+}
+
+/*
+ * Compare the prefix and suffix with the subject, and check that the
+ * characters in-between are valid.
+ */
+static int wildcard_match(const unsigned char *prefix, size_t prefix_len,
+ const unsigned char *suffix, size_t suffix_len,
+ const unsigned char *subject, size_t subject_len,
+ unsigned int flags)
+{
+ const unsigned char *wildcard_start;
+ const unsigned char *wildcard_end;
+ const unsigned char *p;
+ int allow_multi = 0;
+ int allow_idna = 0;
+
+ if (subject_len < prefix_len + suffix_len)
+ return 0;
+ if (!equal_nocase(prefix, prefix_len, subject, prefix_len, flags))
+ return 0;
+ wildcard_start = subject + prefix_len;
+ wildcard_end = subject + (subject_len - suffix_len);
+ if (!equal_nocase(wildcard_end, suffix_len, suffix, suffix_len, flags))
+ return 0;
+ /*
+ * If the wildcard makes up the entire first label, it must match at
+ * least one character.
+ */
+ if (prefix_len == 0 && *suffix == '.') {
+ if (wildcard_start == wildcard_end)
+ return 0;
+ allow_idna = 1;
+ if (flags & X509_CHECK_FLAG_MULTI_LABEL_WILDCARDS)
+ allow_multi = 1;
+ }
+ /* IDNA labels cannot match partial wildcards */
+ if (!allow_idna &&
+ subject_len >= 4 && strncasecmp((char *)subject, "xn--", 4) == 0)
+ return 0;
+ /* The wildcard may match a literal '*' */
+ if (wildcard_end == wildcard_start + 1 && *wildcard_start == '*')
+ return 1;
+ /*
+ * Check that the part matched by the wildcard contains only
+ * permitted characters and only matches a single label unless
+ * allow_multi is set.
+ */
+ for (p = wildcard_start; p != wildcard_end; ++p)
+ if (!(('0' <= *p && *p <= '9') ||
+ ('A' <= *p && *p <= 'Z') ||
+ ('a' <= *p && *p <= 'z') ||
+ *p == '-' || (allow_multi && *p == '.')))
+ return 0;
+ return 1;
+}
+
+#define LABEL_START (1 << 0)
+#define LABEL_END (1 << 1)
+#define LABEL_HYPHEN (1 << 2)
+#define LABEL_IDNA (1 << 3)
+
+static const unsigned char *valid_star(const unsigned char *p, size_t len,
+ unsigned int flags)
+{
+ const unsigned char *star = 0;
+ size_t i;
+ int state = LABEL_START;
+ int dots = 0;
+ for (i = 0; i < len; ++i) {
+ /*
+ * Locate first and only legal wildcard, either at the start
+ * or end of a non-IDNA first and not final label.
+ */
+ if (p[i] == '*') {
+ int atstart = (state & LABEL_START);
+ int atend = (i == len - 1 || p[i + 1] == '.');
+ /*-
+ * At most one wildcard per pattern.
+ * No wildcards in IDNA labels.
+ * No wildcards after the first label.
+ */
+ if (star != NULL || (state & LABEL_IDNA) != 0 || dots)
+ return NULL;
+ /* Only full-label '*.example.com' wildcards? */
+ if ((flags & X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS)
+ && (!atstart || !atend))
+ return NULL;
+ /* No 'foo*bar' wildcards */
+ if (!atstart && !atend)
+ return NULL;
+ star = &p[i];
+ state &= ~LABEL_START;
+ } else if (('a' <= p[i] && p[i] <= 'z')
+ || ('A' <= p[i] && p[i] <= 'Z')
+ || ('0' <= p[i] && p[i] <= '9')) {
+ if ((state & LABEL_START) != 0
+ && len - i >= 4 && strncasecmp((char *)&p[i], "xn--", 4) == 0)
+ state |= LABEL_IDNA;
+ state &= ~(LABEL_HYPHEN | LABEL_START);
+ } else if (p[i] == '.') {
+ if ((state & (LABEL_HYPHEN | LABEL_START)) != 0)
+ return NULL;
+ state = LABEL_START;
+ ++dots;
+ } else if (p[i] == '-') {
+ /* no domain/subdomain starts with '-' */
+ if ((state & LABEL_START) != 0)
+ return NULL;
+ state |= LABEL_HYPHEN;
+ } else
+ return NULL;
+ }
+
+ /*
+ * The final label must not end in a hyphen or ".", and
+ * there must be at least two dots after the star.
+ */
+ if ((state & (LABEL_START | LABEL_HYPHEN)) != 0 || dots < 2)
+ return NULL;
+ return star;
+}
+
+/* Compare using wildcards. */
+static int equal_wildcard(const unsigned char *pattern, size_t pattern_len,
+ const unsigned char *subject, size_t subject_len,
+ unsigned int flags)
+{
+ const unsigned char *star = NULL;
+
+ /*
+ * Subject names starting with '.' can only match a wildcard pattern
+ * via a subject sub-domain pattern suffix match.
+ */
+ if (!(subject_len > 1 && subject[0] == '.'))
+ star = valid_star(pattern, pattern_len, flags);
+ if (star == NULL)
+ return equal_nocase(pattern, pattern_len,
+ subject, subject_len, flags);
+ return wildcard_match(pattern, star - pattern,
+ star + 1, (pattern + pattern_len) - star - 1,
+ subject, subject_len, flags);
+}
+
+/*
+ * Compare an ASN1_STRING to a supplied string. If they match return 1. If
+ * cmp_type > 0 only compare if string matches the type, otherwise convert it
+ * to UTF8.
+ */
+
+static int do_check_string(const ASN1_STRING *a, int cmp_type, equal_fn equal,
+ unsigned int flags, const char *b, size_t blen,
+ char **peername)
+{
+ int rv = 0;
+
+ if (!a->data || !a->length)
+ return 0;
+ if (cmp_type > 0) {
+ if (cmp_type != a->type)
+ return 0;
+ if (cmp_type == V_ASN1_IA5STRING)
+ rv = equal(a->data, a->length, (unsigned char *)b, blen, flags);
+ else if (a->length == (int)blen && !memcmp(a->data, b, blen))
+ rv = 1;
+ if (rv > 0 && peername)
+ *peername = OPENSSL_strndup((char *)a->data, a->length);
+ } else {
+ int astrlen;
+ unsigned char *astr;
+ astrlen = ASN1_STRING_to_UTF8(&astr, (ASN1_STRING*)a);
+ if (astrlen < 0) {
+ /*
+ * -1 could be an internal malloc failure or a decoding error from
+ * malformed input; we can't distinguish.
+ */
+ return -1;
+ }
+ rv = equal(astr, astrlen, (unsigned char *)b, blen, flags);
+ if (rv > 0 && peername)
+ *peername = OPENSSL_strndup((char *)astr, astrlen);
+ //*peername = strndup((char *)astr, astrlen);
+ OPENSSL_free(astr);
+ }
+ return rv;
+}
+
+static int do_x509_check(X509 *x, const char *chk, size_t chklen,
+ unsigned int flags, int check_type, char **peername)
+{
+ GENERAL_NAMES *gens = NULL;
+ X509_NAME *name = NULL;
+ int i;
+ int cnid = NID_undef;
+ int alt_type;
+ int san_present = 0;
+ int rv = 0;
+ equal_fn equal;
+
+ /* See below, this flag is internal-only */
+ flags &= ~_X509_CHECK_FLAG_DOT_SUBDOMAINS;
+ if (check_type == GEN_EMAIL) {
+ cnid = NID_pkcs9_emailAddress;
+ alt_type = V_ASN1_IA5STRING;
+ equal = equal_email;
+ } else if (check_type == GEN_DNS) {
+ cnid = NID_commonName;
+ /* Implicit client-side DNS sub-domain pattern */
+ if (chklen > 1 && chk[0] == '.')
+ flags |= _X509_CHECK_FLAG_DOT_SUBDOMAINS;
+ alt_type = V_ASN1_IA5STRING;
+ if (flags & X509_CHECK_FLAG_NO_WILDCARDS)
+ equal = equal_nocase;
+ else
+ equal = equal_wildcard;
+ } else {
+ alt_type = V_ASN1_OCTET_STRING;
+ equal = equal_case;
+ }
+
+ if (chklen == 0)
+ chklen = strlen(chk);
+
+ gens = (GENERAL_NAMES*)X509_get_ext_d2i(x, NID_subject_alt_name, NULL, NULL);
+ if (gens) {
+ for (i = 0; i < sk_GENERAL_NAME_num(gens); i++) {
+ GENERAL_NAME *gen;
+ ASN1_STRING *cstr;
+ gen = sk_GENERAL_NAME_value(gens, i);
+ if (gen->type != check_type)
+ continue;
+ san_present = 1;
+ if (check_type == GEN_EMAIL)
+ cstr = gen->d.rfc822Name;
+ else if (check_type == GEN_DNS)
+ cstr = gen->d.dNSName;
+ else
+ cstr = gen->d.iPAddress;
+ /* Positive on success, negative on error! */
+ if ((rv = do_check_string(cstr, alt_type, equal, flags,
+ chk, chklen, peername)) != 0)
+ break;
+ }
+ GENERAL_NAMES_free(gens);
+ if (rv != 0)
+ return rv;
+ if (san_present && !(flags & X509_CHECK_FLAG_ALWAYS_CHECK_SUBJECT))
+ return 0;
+ }
+
+ /* We're done if CN-ID is not pertinent */
+ if (cnid == NID_undef || (flags & X509_CHECK_FLAG_NEVER_CHECK_SUBJECT))
+ return 0;
+
+ i = -1;
+ name = X509_get_subject_name(x);
+ while ((i = X509_NAME_get_index_by_NID(name, cnid, i)) >= 0) {
+ const X509_NAME_ENTRY *ne = X509_NAME_get_entry(name, i);
+ const ASN1_STRING *str = X509_NAME_ENTRY_get_data((X509_NAME_ENTRY*)ne);
+
+ /* Positive on success, negative on error! */
+ if ((rv = do_check_string(str, -1, equal, flags,
+ chk, chklen, peername)) != 0)
+ return rv;
+ }
+ return 0;
+}
+
+int X509_check_host(X509 *x, const char *chk, size_t chklen,
+ unsigned int flags, char **peername)
+{
+ if (chk == NULL)
+ return -2;
+ /*
+ * Embedded NULs are disallowed, except as the last character of a
+ * string of length 2 or more (tolerate caller including terminating
+ * NUL in string length).
+ */
+ if (chklen == 0)
+ chklen = strlen(chk);
+ else if (memchr(chk, '\0', chklen > 1 ? chklen - 1 : chklen))
+ return -2;
+ if (chklen > 1 && chk[chklen - 1] == '\0')
+ --chklen;
+ return do_x509_check(x, chk, chklen, flags, GEN_DNS, peername);
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/security/x509_check_host.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/security/x509_check_host.h b/be/src/kudu/security/x509_check_host.h
new file mode 100644
index 0000000..d2d5af9
--- /dev/null
+++ b/be/src/kudu/security/x509_check_host.h
@@ -0,0 +1,50 @@
+/*
+ * Copyright 1999-2016 The OpenSSL Project Authors. All Rights Reserved.
+ *
+ * Licensed under the OpenSSL license (the "License"). You may not use
+ * this file except in compliance with the License. You can obtain a copy
+ * in the file LICENSE in the source distribution or at
+ * https://www.openssl.org/source/license.html
+ */
+
+// The following is ported from the OpenSSL-1.1.0b library.
+
+#ifndef X509_CHECK_HOST_H
+#define X509_CHECK_HOST_H
+
+#include <stdlib.h>
+// IWYU pragma: no_include <openssl/x509.h>
+// IWYU pragma: no_include "openssl/x509.h"
+
+typedef struct x509_st X509;
+
+/* Flags for X509_check_* functions */
+
+/*
+ * Always check subject name for host match even if subject alt names present
+ */
+# define X509_CHECK_FLAG_ALWAYS_CHECK_SUBJECT 0x1
+/* Disable wildcard matching for dnsName fields and common name. */
+# define X509_CHECK_FLAG_NO_WILDCARDS 0x2
+/* Wildcards must not match a partial label. */
+# define X509_CHECK_FLAG_NO_PARTIAL_WILDCARDS 0x4
+/* Allow (non-partial) wildcards to match multiple labels. */
+# define X509_CHECK_FLAG_MULTI_LABEL_WILDCARDS 0x8
+/* Constraint verifier subdomain patterns to match a single labels. */
+# define X509_CHECK_FLAG_SINGLE_LABEL_SUBDOMAINS 0x10
+/* Never check the subject CN */
+# define X509_CHECK_FLAG_NEVER_CHECK_SUBJECT 0x20
+/*
+ * Match reference identifiers starting with "." to any sub-domain.
+ * This is a non-public flag, turned on implicitly when the subject
+ * reference identity is a DNS name.
+ */
+# define _X509_CHECK_FLAG_DOT_SUBDOMAINS 0x8000
+
+// Checks if the certificate Subject Alternative Name (SAN) or Subject CommonName (CN)
+// matches the specified host name, which must be encoded in the preferred name syntax
+// described in section 3.5 of RFC 1034.
+int X509_check_host(X509 *x, const char *chk, size_t chklen,
+ unsigned int flags, char **peername);
+
+#endif // X509_CHECK_HOST_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/CMakeLists.txt b/be/src/kudu/util/CMakeLists.txt
new file mode 100644
index 0000000..94dd2fa
--- /dev/null
+++ b/be/src/kudu/util/CMakeLists.txt
@@ -0,0 +1,482 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#######################################
+# util_compression_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ UTIL_COMPRESSION_PROTO_SRCS UTIL_COMPRESSION_PROTO_HDRS UTIL_COMPRESSION_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES compression/compression.proto)
+ADD_EXPORTABLE_LIBRARY(util_compression_proto
+ SRCS ${UTIL_COMPRESSION_PROTO_SRCS}
+ DEPS protobuf
+ NONLINK_DEPS ${UTIL_COMPRESSION_PROTO_TGTS})
+
+#######################################
+# histogram_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ HISTOGRAM_PROTO_SRCS HISTOGRAM_PROTO_HDRS HISTOGRAM_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES histogram.proto)
+ADD_EXPORTABLE_LIBRARY(histogram_proto
+ SRCS ${HISTOGRAM_PROTO_SRCS}
+ DEPS protobuf
+ NONLINK_DEPS ${HISTOGRAM_PROTO_TGTS})
+
+#######################################
+# maintenance_manager_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ MAINTENANCE_MANAGER_PROTO_SRCS MAINTENANCE_MANAGER_PROTO_HDRS MAINTENANCE_MANAGER_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES maintenance_manager.proto)
+ADD_EXPORTABLE_LIBRARY(maintenance_manager_proto
+ SRCS ${MAINTENANCE_MANAGER_PROTO_SRCS}
+ DEPS protobuf
+ NONLINK_DEPS ${MAINTENANCE_MANAGER_PROTO_TGTS})
+
+#######################################
+# pb_util_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ PB_UTIL_PROTO_SRCS PB_UTIL_PROTO_HDRS PB_UTIL_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES pb_util.proto)
+ADD_EXPORTABLE_LIBRARY(pb_util_proto
+ SRCS ${PB_UTIL_PROTO_SRCS}
+ DEPS protobuf
+ NONLINK_DEPS ${PB_UTIL_PROTO_TGTS})
+
+#######################################
+# version_info_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ VERSION_INFO_PROTO_SRCS VERSION_INFO_PROTO_HDRS VERSION_INFO_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES version_info.proto)
+ADD_EXPORTABLE_LIBRARY(version_info_proto
+ SRCS ${VERSION_INFO_PROTO_SRCS}
+ DEPS protobuf
+ NONLINK_DEPS ${VERSION_INFO_PROTO_TGTS})
+
+############################################################
+# Version stamp
+############################################################
+
+# Unlike CMAKE_CURRENT_BINARY_DIR, CMAKE_BINARY_DIR is always the root of
+# the build directory.
+set(VERSION_STAMP_FILE ${CMAKE_BINARY_DIR}/src/kudu/generated/version_defines.h)
+
+list(APPEND GEN_VERSION_INFO_COMMAND "${BUILD_SUPPORT_DIR}/gen_version_info.py")
+list(APPEND GEN_VERSION_INFO_COMMAND "--version=${KUDU_VERSION_NUMBER}")
+list(APPEND GEN_VERSION_INFO_COMMAND "--build-type=${CMAKE_BUILD_TYPE}")
+if(KUDU_GIT_HASH)
+ message(STATUS "Provided git hash: ${KUDU_GIT_HASH}")
+ list(APPEND GEN_VERSION_INFO_COMMAND "--git-hash=${KUDU_GIT_HASH}")
+endif()
+list(APPEND GEN_VERSION_INFO_COMMAND "${VERSION_STAMP_FILE}")
+add_custom_target(gen_version_info
+ COMMAND ${GEN_VERSION_INFO_COMMAND}
+ BYPRODUCTS "${VERSION_STAMP_FILE}")
+
+#######################################
+# kudu_util
+#######################################
+
+if (APPLE)
+ set(SEMAPHORE_CC "semaphore_macosx.cc")
+else ()
+ set(SEMAPHORE_CC "semaphore.cc")
+endif()
+
+set(UTIL_SRCS
+ async_logger.cc
+ atomic.cc
+ bitmap.cc
+ bloom_filter.cc
+ bitmap.cc
+ cache.cc
+ cache_metrics.cc
+ coding.cc
+ condition_variable.cc
+ cow_object.cc
+ crc.cc
+ debug-util.cc
+ decimal_util.cc
+ debug/trace_event_impl.cc
+ debug/trace_event_impl_constants.cc
+ debug/trace_event_synthetic_delay.cc
+ debug/unwind_safeness.cc
+ easy_json.cc
+ env.cc env_posix.cc env_util.cc
+ errno.cc
+ faststring.cc
+ fault_injection.cc
+ file_cache.cc
+ flags.cc
+ flag_tags.cc
+ flag_validators.cc
+ group_varint.cc
+ pstack_watcher.cc
+ hdr_histogram.cc
+ hexdump.cc
+ init.cc
+ jsonreader.cc
+ jsonwriter.cc
+ kernel_stack_watchdog.cc
+ locks.cc
+ logging.cc
+ maintenance_manager.cc
+ malloc.cc
+ memcmpable_varint.cc
+ memory/arena.cc
+ memory/memory.cc
+ memory/overwrite.cc
+ mem_tracker.cc
+ metrics.cc
+ minidump.cc
+ monotime.cc
+ mutex.cc
+ net/dns_resolver.cc
+ net/net_util.cc
+ net/sockaddr.cc
+ net/socket.cc
+ oid_generator.cc
+ once.cc
+ os-util.cc
+ path_util.cc
+ pb_util.cc
+ pb_util-internal.cc
+ process_memory.cc
+ random_util.cc
+ rolling_log.cc
+ rw_mutex.cc
+ rwc_lock.cc
+ ${SEMAPHORE_CC}
+ signal.cc
+ slice.cc
+ spinlock_profiling.cc
+ status.cc
+ status_callback.cc
+ string_case.cc
+ striped64.cc
+ subprocess.cc
+ test_graph.cc
+ test_util_prod.cc
+ thread.cc
+ threadlocal.cc
+ threadpool.cc
+ thread_restrictions.cc
+ throttler.cc
+ trace.cc
+ trace_metrics.cc
+ user.cc
+ url-coding.cc
+ version_info.cc
+ version_util.cc
+ website_util.cc
+ zlib.cc
+)
+
+# overwrite.cc contains a single function which would be a hot spot in
+# debug builds. It's separated into a separate file so it can be
+# optimized regardless of the default optimization options.
+set_source_files_properties(memory/overwrite.cc PROPERTIES COMPILE_FLAGS "-O3")
+
+if(HAVE_LIB_VMEM)
+ set(UTIL_SRCS
+ ${UTIL_SRCS}
+ nvm_cache.cc)
+endif()
+
+set(UTIL_LIBS
+ crcutil
+ gflags
+ glog
+ gutil
+ histogram_proto
+ libev
+ maintenance_manager_proto
+ pb_util_proto
+ protobuf
+ version_info_proto
+ zlib)
+
+if(NOT APPLE)
+ set(UTIL_LIBS
+ ${UTIL_LIBS}
+ breakpad_client
+ dl
+ rt)
+endif()
+
+if(HAVE_LIB_VMEM)
+ set(UTIL_LIBS
+ ${UTIL_LIBS}
+ vmem)
+endif()
+
+# We use MallocExtension, but not in the exported version of the library.
+set(EXPORTED_UTIL_LIBS ${UTIL_LIBS})
+if(${KUDU_TCMALLOC_AVAILABLE})
+ list(APPEND UTIL_LIBS tcmalloc)
+endif()
+
+ADD_EXPORTABLE_LIBRARY(kudu_util
+ SRCS ${UTIL_SRCS}
+ DEPS ${UTIL_LIBS}
+ NONLINK_DEPS gen_version_info
+ EXPORTED_DEPS ${EXPORTED_UTIL_LIBS})
+
+#######################################
+# kudu_util_compression
+#######################################
+set(UTIL_COMPRESSION_SRCS
+ compression/compression_codec.cc)
+set(UTIL_COMPRESSION_LIBS
+ kudu_util
+ util_compression_proto
+
+ glog
+ gutil
+ lz4
+ snappy
+ zlib)
+ADD_EXPORTABLE_LIBRARY(kudu_util_compression
+ SRCS ${UTIL_COMPRESSION_SRCS}
+ DEPS ${UTIL_COMPRESSION_LIBS})
+
+#######################################
+# kudu_test_util
+#######################################
+
+# Used by mini-cluster, so must be built even when NO_TESTS=0.
+add_library(kudu_test_util
+ test_util.cc)
+target_link_libraries(kudu_test_util
+ gflags
+ glog
+ gmock
+ kudu_util)
+
+if(HAVE_LIB_VMEM)
+ target_link_libraries(kudu_test_util
+ vmem)
+endif()
+
+#######################################
+# kudu_curl_util
+#######################################
+if(NOT NO_TESTS)
+ add_library(kudu_curl_util
+ curl_util.cc)
+ target_link_libraries(kudu_curl_util
+ security
+ ${CURL_LIBRARIES}
+ glog
+ gutil)
+endif()
+
+#######################################
+# kudu_test_main
+#######################################
+if(NOT NO_TESTS)
+ add_library(kudu_test_main
+ test_main.cc)
+ target_link_libraries(kudu_test_main
+ ${KRB5_REALM_OVERRIDE}
+ gflags
+ glog
+ gmock
+ kudu_util
+ kudu_test_util)
+
+ if(NOT APPLE)
+ target_link_libraries(kudu_test_main
+ dl
+ rt)
+ endif()
+endif()
+
+#######################################
+# protoc-gen-insertions
+#######################################
+
+add_executable(protoc-gen-insertions protoc-gen-insertions.cc)
+target_link_libraries(protoc-gen-insertions gutil protobuf protoc ${KUDU_BASE_LIBS})
+
+#######################################
+# Unit tests
+#######################################
+
+set(KUDU_TEST_LINK_LIBS kudu_util gutil ${KUDU_MIN_TEST_LIBS})
+ADD_KUDU_TEST(async_util-test)
+ADD_KUDU_TEST(atomic-test)
+ADD_KUDU_TEST(bit-util-test)
+ADD_KUDU_TEST(bitmap-test)
+ADD_KUDU_TEST(blocking_queue-test)
+ADD_KUDU_TEST(bloom_filter-test)
+ADD_KUDU_TEST(cache-bench RUN_SERIAL true)
+ADD_KUDU_TEST(cache-test)
+ADD_KUDU_TEST(callback_bind-test)
+ADD_KUDU_TEST(countdown_latch-test)
+ADD_KUDU_TEST(crc-test RUN_SERIAL true) # has a benchmark
+ADD_KUDU_TEST(debug-util-test)
+ADD_KUDU_TEST(decimal_util-test)
+ADD_KUDU_TEST(easy_json-test)
+ADD_KUDU_TEST(env-test LABELS no_tsan)
+ADD_KUDU_TEST(env_util-test)
+ADD_KUDU_TEST(errno-test)
+ADD_KUDU_TEST(faststring-test)
+ADD_KUDU_TEST(file_cache-test)
+ADD_KUDU_TEST(file_cache-stress-test RUN_SERIAL true)
+ADD_KUDU_TEST(flag_tags-test)
+ADD_KUDU_TEST(flag_validators-test)
+ADD_KUDU_TEST(flags-test)
+ADD_KUDU_TEST(group_varint-test)
+ADD_KUDU_TEST(hash_util-test)
+ADD_KUDU_TEST(hdr_histogram-test)
+ADD_KUDU_TEST(int128-test)
+ADD_KUDU_TEST(inline_slice-test)
+ADD_KUDU_TEST(interval_tree-test)
+ADD_KUDU_TEST(jsonreader-test)
+ADD_KUDU_TEST(knapsack_solver-test)
+ADD_KUDU_TEST(logging-test)
+ADD_KUDU_TEST(maintenance_manager-test)
+ADD_KUDU_TEST(map-util-test)
+ADD_KUDU_TEST(mem_tracker-test)
+ADD_KUDU_TEST(memcmpable_varint-test LABELS no_tsan)
+ADD_KUDU_TEST(memory/arena-test)
+ADD_KUDU_TEST(metrics-test)
+ADD_KUDU_TEST(monotime-test)
+ADD_KUDU_TEST(mt-hdr_histogram-test RUN_SERIAL true)
+ADD_KUDU_TEST(mt-metrics-test RUN_SERIAL true)
+ADD_KUDU_TEST(mt-threadlocal-test RUN_SERIAL true)
+ADD_KUDU_TEST(net/dns_resolver-test)
+ADD_KUDU_TEST(net/net_util-test)
+ADD_KUDU_TEST(net/socket-test)
+ADD_KUDU_TEST(object_pool-test)
+ADD_KUDU_TEST(oid_generator-test)
+ADD_KUDU_TEST(once-test)
+ADD_KUDU_TEST(os-util-test)
+ADD_KUDU_TEST(path_util-test)
+ADD_KUDU_TEST(process_memory-test RUN_SERIAL true)
+ADD_KUDU_TEST(random-test)
+ADD_KUDU_TEST(random_util-test)
+ADD_KUDU_TEST(rle-test)
+ADD_KUDU_TEST(rolling_log-test)
+ADD_KUDU_TEST(rw_mutex-test RUN_SERIAL true)
+ADD_KUDU_TEST(rw_semaphore-test)
+ADD_KUDU_TEST(rwc_lock-test RUN_SERIAL true)
+ADD_KUDU_TEST(safe_math-test)
+ADD_KUDU_TEST(scoped_cleanup-test)
+ADD_KUDU_TEST(slice-test)
+ADD_KUDU_TEST(sorted_disjoint_interval_list-test)
+ADD_KUDU_TEST(spinlock_profiling-test)
+ADD_KUDU_TEST(stack_watchdog-test PROCESSORS 2)
+ADD_KUDU_TEST(status-test)
+ADD_KUDU_TEST(string_case-test)
+ADD_KUDU_TEST(striped64-test RUN_SERIAL true)
+ADD_KUDU_TEST(subprocess-test)
+ADD_KUDU_TEST(thread-test)
+ADD_KUDU_TEST(threadpool-test)
+ADD_KUDU_TEST(throttler-test)
+ADD_KUDU_TEST(trace-test PROCESSORS 4)
+ADD_KUDU_TEST(url-coding-test)
+ADD_KUDU_TEST(user-test)
+ADD_KUDU_TEST(version_util-test)
+
+if (NOT APPLE)
+ ADD_KUDU_TEST(minidump-test)
+ ADD_KUDU_TEST(pstack_watcher-test)
+endif()
+
+#######################################
+# jsonwriter_test_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ JSONWRITER_TEST_PROTO_SRCS JSONWRITER_TEST_PROTO_HDRS JSONWRITER_TEST_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES jsonwriter_test.proto)
+add_library(jsonwriter_test_proto ${JSONWRITER_TEST_PROTO_SRCS} ${JSONWRITER_TEST_PROTO_HDRS})
+target_link_libraries(jsonwriter_test_proto
+ pb_util_proto
+ protobuf)
+
+#######################################
+# jsonwriter-test
+#######################################
+
+ADD_KUDU_TEST(jsonwriter-test)
+if(NOT NO_TESTS)
+ target_link_libraries(jsonwriter-test
+ jsonwriter_test_proto)
+endif()
+
+#######################################
+# pb_util_test_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ PROTO_CONTAINER_TEST_PROTO_SRCS PROTO_CONTAINER_TEST_PROTO_HDRS PROTO_CONTAINER_TEST_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES
+ proto_container_test.proto
+ proto_container_test2.proto
+ proto_container_test3.proto
+ pb_util_test.proto)
+add_library(pb_util_test_proto
+ ${PROTO_CONTAINER_TEST_PROTO_SRCS}
+ ${PROTO_CONTAINER_TEST_PROTO_HDRS})
+target_link_libraries(pb_util_test_proto
+ pb_util_proto
+ protobuf)
+
+#######################################
+# pb_util-test
+#######################################
+
+ADD_KUDU_TEST(pb_util-test)
+if(NOT NO_TESTS)
+ target_link_libraries(pb_util-test
+ pb_util_test_proto)
+endif()
+
+#######################################
+# util/compression tests
+#######################################
+ADD_KUDU_TEST(compression/compression-test)
+if(NOT NO_TESTS)
+ target_link_libraries(compression-test
+ cfile
+ kudu_util_compression)
+endif()
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/alignment.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/alignment.h b/be/src/kudu/util/alignment.h
new file mode 100644
index 0000000..8e902d2
--- /dev/null
+++ b/be/src/kudu/util/alignment.h
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Macros for dealing with memory alignment.
+#ifndef KUDU_UTIL_ALIGNMENT_H
+#define KUDU_UTIL_ALIGNMENT_H
+
+// Round down 'x' to the nearest 'align' boundary
+#define KUDU_ALIGN_DOWN(x, align) ((x) & (~(align) + 1))
+
+// Round up 'x' to the nearest 'align' boundary
+#define KUDU_ALIGN_UP(x, align) (((x) + ((align) - 1)) & (~(align) + 1))
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/array_view.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/array_view.h b/be/src/kudu/util/array_view.h
new file mode 100644
index 0000000..24ee727
--- /dev/null
+++ b/be/src/kudu/util/array_view.h
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2015 The WebRTC Project Authors. All rights reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE.txt file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the same file. All contributing project authors may
+ * be found in the AUTHORS file in the root of the WebRTC source tree.
+ *
+ * Imported into Kudu from WebRTC and modified to fit in the Kudu namespace
+ * and avoid referring to anything WebRTC-specific like rtc::Buffer.
+ */
+
+#pragma once
+
+#include <glog/logging.h>
+
+namespace kudu {
+
+// Many functions read from or write to arrays. The obvious way to do this is
+// to use two arguments, a pointer to the first element and an element count:
+//
+// bool Contains17(const int* arr, size_t size) {
+// for (size_t i = 0; i < size; ++i) {
+// if (arr[i] == 17)
+// return true;
+// }
+// return false;
+// }
+//
+// This is flexible, since it doesn't matter how the array is stored (C array,
+// std::vector, std::array, ...), but it's error-prone because the caller has
+// to correctly specify the array length:
+//
+// Contains17(arr, arraysize(arr)); // C array
+// Contains17(&arr[0], arr.size()); // std::vector
+// Contains17(arr, size); // pointer + size
+// ...
+//
+// It's also kind of messy to have two separate arguments for what is
+// conceptually a single thing.
+//
+// Enter kudu::ArrayView<T>. It contains a T pointer (to an array it doesn't
+// own) and a count, and supports the basic things you'd expect, such as
+// indexing and iteration. It allows us to write our function like this:
+//
+// bool Contains17(kudu::ArrayView<const int> arr) {
+// for (auto e : arr) {
+// if (e == 17)
+// return true;
+// }
+// return false;
+// }
+//
+// And even better, because a bunch of things will implicitly convert to
+// ArrayView, we can call it like this:
+//
+// Contains17(arr); // C array
+// Contains17(arr); // std::vector
+// Contains17(kudu::ArrayView<int>(arr, size)); // pointer + size
+// ...
+//
+// One important point is that ArrayView<T> and ArrayView<const T> are
+// different types, which allow and don't allow mutation of the array elements,
+// respectively. The implicit conversions work just like you'd hope, so that
+// e.g. vector<int> will convert to either ArrayView<int> or ArrayView<const
+// int>, but const vector<int> will convert only to ArrayView<const int>.
+// (ArrayView itself can be the source type in such conversions, so
+// ArrayView<int> will convert to ArrayView<const int>.)
+//
+// Note: ArrayView is tiny (just a pointer and a count) and trivially copyable,
+// so it's probably cheaper to pass it by value than by const reference.
+template <typename T>
+class ArrayView final {
+ public:
+ // Construct an empty ArrayView.
+ ArrayView() : ArrayView(static_cast<T*>(nullptr), 0) {}
+
+ // Construct an ArrayView for a (pointer,size) pair.
+ template <typename U>
+ ArrayView(U* data, size_t size)
+ : data_(size == 0 ? nullptr : data), size_(size) {
+ CheckInvariant();
+ }
+
+ // Construct an ArrayView for an array.
+ template <typename U, size_t N>
+ ArrayView(U (&array)[N]) : ArrayView(&array[0], N) {} // NOLINT(runtime/explicit)
+
+ // Construct an ArrayView for any type U that has a size() method whose
+ // return value converts implicitly to size_t, and a data() method whose
+ // return value converts implicitly to T*. In particular, this means we allow
+ // conversion from ArrayView<T> to ArrayView<const T>, but not the other way
+ // around. Other allowed conversions include std::vector<T> to ArrayView<T>
+ // or ArrayView<const T>, const std::vector<T> to ArrayView<const T>, and
+ // kudu::faststring to ArrayView<uint8_t> (with the same const behavior as
+ // std::vector).
+ template <typename U>
+ ArrayView(U& u) : ArrayView(u.data(), u.size()) {} // NOLINT(runtime/explicit)
+
+ // Indexing, size, and iteration. These allow mutation even if the ArrayView
+ // is const, because the ArrayView doesn't own the array. (To prevent
+ // mutation, use ArrayView<const T>.)
+ size_t size() const { return size_; }
+ bool empty() const { return size_ == 0; }
+ T* data() const { return data_; }
+ T& operator[](size_t idx) const {
+ DCHECK_LT(idx, size_);
+ DCHECK(data_); // Follows from size_ > idx and the class invariant.
+ return data_[idx];
+ }
+ T* begin() const { return data_; }
+ T* end() const { return data_ + size_; }
+ const T* cbegin() const { return data_; }
+ const T* cend() const { return data_ + size_; }
+
+ // Comparing two ArrayViews compares their (pointer,size) pairs; it does
+ // *not* dereference the pointers.
+ friend bool operator==(const ArrayView& a, const ArrayView& b) {
+ return a.data_ == b.data_ && a.size_ == b.size_;
+ }
+ friend bool operator!=(const ArrayView& a, const ArrayView& b) {
+ return !(a == b);
+ }
+
+ private:
+ // Invariant: !data_ iff size_ == 0.
+ void CheckInvariant() const { DCHECK_EQ(!data_, size_ == 0); }
+ T* data_;
+ size_t size_;
+};
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/async_logger.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/async_logger.cc b/be/src/kudu/util/async_logger.cc
new file mode 100644
index 0000000..3214a42
--- /dev/null
+++ b/be/src/kudu/util/async_logger.cc
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/async_logger.h"
+
+#include <string>
+#include <thread>
+
+#include "kudu/util/monotime.h"
+
+using std::string;
+
+namespace kudu {
+
+AsyncLogger::AsyncLogger(google::base::Logger* wrapped,
+ int max_buffer_bytes) :
+ max_buffer_bytes_(max_buffer_bytes),
+ wrapped_(DCHECK_NOTNULL(wrapped)),
+ wake_flusher_cond_(&lock_),
+ free_buffer_cond_(&lock_),
+ flush_complete_cond_(&lock_),
+ active_buf_(new Buffer()),
+ flushing_buf_(new Buffer()) {
+ DCHECK_GT(max_buffer_bytes_, 0);
+}
+
+AsyncLogger::~AsyncLogger() {}
+
+void AsyncLogger::Start() {
+ CHECK_EQ(state_, INITTED);
+ state_ = RUNNING;
+ thread_ = std::thread(&AsyncLogger::RunThread, this);
+}
+
+void AsyncLogger::Stop() {
+ {
+ MutexLock l(lock_);
+ CHECK_EQ(state_, RUNNING);
+ state_ = STOPPED;
+ wake_flusher_cond_.Signal();
+ }
+ thread_.join();
+ CHECK(active_buf_->messages.empty());
+ CHECK(flushing_buf_->messages.empty());
+}
+
+void AsyncLogger::Write(bool force_flush,
+ time_t timestamp,
+ const char* message,
+ int message_len) {
+ {
+ MutexLock l(lock_);
+ DCHECK_EQ(state_, RUNNING);
+ while (BufferFull(*active_buf_)) {
+ app_threads_blocked_count_for_tests_++;
+ free_buffer_cond_.Wait();
+ }
+ active_buf_->add(Msg(timestamp, string(message, message_len)),
+ force_flush);
+ wake_flusher_cond_.Signal();
+ }
+
+ // In most cases, we take the 'force_flush' argument to mean that we'll let the logger
+ // thread do the flushing for us, but not block the application. However, for the
+ // special case of a FATAL log message, we really want to make sure that our message
+ // hits the log before we continue, or else it's likely that the application will exit
+ // while it's still in our buffer.
+ //
+ // NOTE: even if the application doesn't wrap the FATAL-level logger, log messages at
+ // FATAL are also written to all other log files with lower levels. So, a FATAL message
+ // will force a synchronous flush of all lower-level logs before exiting.
+ //
+ // Unfortunately, the underlying log level isn't passed through to this interface, so we
+ // have to use this hack: messages from FATAL errors start with the character 'F'.
+ if (message_len > 0 && message[0] == 'F') {
+ Flush();
+ }
+}
+
+void AsyncLogger::Flush() {
+ MutexLock l(lock_);
+ DCHECK_EQ(state_, RUNNING);
+
+ // Wake up the writer thread at least twice.
+ // This ensures that it has completely flushed both buffers.
+ uint64_t orig_flush_count = flush_count_;
+ while (flush_count_ < orig_flush_count + 2 &&
+ state_ == RUNNING) {
+ active_buf_->flush = true;
+ wake_flusher_cond_.Signal();
+ flush_complete_cond_.Wait();
+ }
+}
+
+uint32_t AsyncLogger::LogSize() {
+ return wrapped_->LogSize();
+}
+
+void AsyncLogger::RunThread() {
+ MutexLock l(lock_);
+ while (state_ == RUNNING || active_buf_->needs_flush_or_write()) {
+ while (!active_buf_->needs_flush_or_write() && state_ == RUNNING) {
+ if (!wake_flusher_cond_.WaitFor(MonoDelta::FromSeconds(FLAGS_logbufsecs))) {
+ // In case of wait timeout, force it to flush regardless whether there is anything enqueued.
+ active_buf_->flush = true;
+ }
+ }
+
+ active_buf_.swap(flushing_buf_);
+ // If the buffer that we are about to flush was full, then
+ // we may have other threads which were blocked that we now
+ // need to wake up.
+ if (BufferFull(*flushing_buf_)) {
+ free_buffer_cond_.Broadcast();
+ }
+ l.Unlock();
+
+ for (const auto& msg : flushing_buf_->messages) {
+ wrapped_->Write(false, msg.ts, msg.message.data(), msg.message.size());
+ }
+ if (flushing_buf_->flush) {
+ wrapped_->Flush();
+ }
+ flushing_buf_->clear();
+
+ l.Lock();
+ flush_count_++;
+ flush_complete_cond_.Broadcast();
+ }
+}
+
+bool AsyncLogger::BufferFull(const Buffer& buf) const {
+ // We evenly divide our total buffer space between the two buffers.
+ return buf.size > (max_buffer_bytes_ / 2);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/async_logger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/async_logger.h b/be/src/kudu/util/async_logger.h
new file mode 100644
index 0000000..aedbdde
--- /dev/null
+++ b/be/src/kudu/util/async_logger.h
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#include <cstdint>
+#include <ctime>
+#include <memory>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+// Wrapper for a glog Logger which asynchronously writes log messages.
+// This class starts a new thread responsible for forwarding the messages
+// to the logger, and performs double buffering. Writers append to the
+// current buffer and then wake up the logger thread. The logger swaps in
+// a new buffer and writes any accumulated messages to the wrapped
+// Logger.
+//
+// This double-buffering behavior dramatically improves performance, especially
+// for logging messages which require flushing the underlying file (i.e WARNING
+// and above for default). The flush can take a couple of milliseconds, and in
+// some cases can even block for hundreds of milliseconds or more. With the
+// double-buffered approach, threads can proceed with useful work while the IO
+// thread blocks.
+//
+// The semantics provided by this wrapper are slightly weaker than the default
+// glog semantics. By default, glog will immediately (synchronously) flush WARNING
+// and above to the underlying file, whereas here we are deferring that flush to
+// the separate thread. This means that a crash just after a 'LOG(WARNING)' would
+// may be missing the message in the logs, but the perf benefit is probably
+// worth it. We do take care that a glog FATAL message flushes all buffered log
+// messages before exiting.
+//
+// NOTE: the logger limits the total amount of buffer space, so if the underlying
+// log blocks for too long, eventually the threads generating the log messages
+// will block as well. This prevents runaway memory usage.
+class AsyncLogger : public google::base::Logger {
+ public:
+ AsyncLogger(google::base::Logger* wrapped,
+ int max_buffer_bytes);
+ ~AsyncLogger();
+
+ void Start();
+
+ // Stop the thread. Flush() and Write() must not be called after this.
+ //
+ // NOTE: this is currently only used in tests: in real life, we enable async
+ // logging once when the program starts and then never disable it.
+ //
+ // REQUIRES: Start() must have been called.
+ void Stop();
+
+ // Write a message to the log.
+ //
+ // 'force_flush' is set by the GLog library based on the configured '--logbuflevel'
+ // flag. Any messages logged at the configured level or higher result in 'force_flush'
+ // being set to true, indicating that the message should be immediately written to the
+ // log rather than buffered in memory. See the class-level docs above for more detail
+ // about the implementation provided here.
+ //
+ // REQUIRES: Start() must have been called.
+ void Write(bool force_flush,
+ time_t timestamp,
+ const char* message,
+ int message_len) override;
+
+ // Flush any buffered messages.
+ void Flush() override;
+
+ // Get the current LOG file size.
+ // The returned value is approximate since some
+ // logged data may not have been flushed to disk yet.
+ uint32_t LogSize() override;
+
+ // Return a count of how many times an application thread was
+ // blocked due to the buffers being full and the writer thread
+ // not keeping up.
+ int app_threads_blocked_count_for_tests() const {
+ MutexLock l(lock_);
+ return app_threads_blocked_count_for_tests_;
+ }
+
+ private:
+ // A buffered message.
+ //
+ // TODO(todd): using std::string for buffered messages is convenient but not
+ // as efficient as it could be. Better would be to make the buffers just be
+ // Arenas and allocate both the message data and Msg struct from them, forming
+ // a linked list.
+ struct Msg {
+ time_t ts;
+ std::string message;
+
+ Msg(time_t ts, std::string message)
+ : ts(ts),
+ message(std::move(message)) {
+ }
+ };
+
+ // A buffer of messages waiting to be flushed.
+ struct Buffer {
+ std::vector<Msg> messages;
+
+ // Estimate of the size of 'messages'.
+ int size = 0;
+
+ // Whether this buffer needs an explicit flush of the
+ // underlying logger.
+ bool flush = false;
+
+ Buffer() {}
+
+ void clear() {
+ messages.clear();
+ size = 0;
+ flush = false;
+ }
+
+ void add(Msg msg, bool flush) {
+ size += sizeof(msg) + msg.message.size();
+ messages.emplace_back(std::move(msg));
+ this->flush |= flush;
+ }
+
+ bool needs_flush_or_write() const {
+ return flush || !messages.empty();
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Buffer);
+ };
+
+ bool BufferFull(const Buffer& buf) const;
+ void RunThread();
+
+ // The maximum number of bytes used by the entire class.
+ const int max_buffer_bytes_;
+ google::base::Logger* const wrapped_;
+ std::thread thread_;
+
+ // Count of how many times an application thread was blocked due to
+ // a full buffer.
+ int app_threads_blocked_count_for_tests_ = 0;
+
+ // Count of how many times the writer thread has flushed the buffers.
+ // 64 bits should be enough to never worry about overflow.
+ uint64_t flush_count_ = 0;
+
+ // Protects buffers as well as 'state_'.
+ mutable Mutex lock_;
+
+ // Signaled by app threads to wake up the flusher, either for new
+ // data or because 'state_' changed.
+ ConditionVariable wake_flusher_cond_;
+
+ // Signaled by the flusher thread when the flusher has swapped in
+ // a free buffer to write to.
+ ConditionVariable free_buffer_cond_;
+
+ // Signaled by the flusher thread when it has completed flushing
+ // the current buffer.
+ ConditionVariable flush_complete_cond_;
+
+ // The buffer to which application threads append new log messages.
+ std::unique_ptr<Buffer> active_buf_;
+
+ // The buffer currently being flushed by the logger thread, cleared
+ // after a successful flush.
+ std::unique_ptr<Buffer> flushing_buf_;
+
+ // Trigger for the logger thread to stop.
+ enum State {
+ INITTED,
+ RUNNING,
+ STOPPED
+ };
+ State state_ = INITTED;
+
+ DISALLOW_COPY_AND_ASSIGN(AsyncLogger);
+};
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/async_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/async_util-test.cc b/be/src/kudu/util/async_util-test.cc
new file mode 100644
index 0000000..5cb7a63
--- /dev/null
+++ b/be/src/kudu/util/async_util-test.cc
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/async_util.h"
+
+#include <unistd.h>
+
+#include <functional>
+#include <thread>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::thread;
+using std::vector;
+
+namespace kudu {
+
+class AsyncUtilTest : public KuduTest {
+ public:
+ AsyncUtilTest() {
+ // Set up an alarm to fail the test in case of deadlock.
+ alarm(30);
+ }
+ ~AsyncUtilTest() {
+ // Disable the alarm on test exit.
+ alarm(0);
+ }
+};
+
+// Test completing the synchronizer through each of the APIs it exposes.
+TEST_F(AsyncUtilTest, TestSynchronizerCompletion) {
+ Synchronizer sync;
+
+ {
+ auto waiter = thread([sync] {
+ ignore_result(sync.Wait());
+ });
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ sync.StatusCB(Status::OK());
+ waiter.join();
+ }
+ sync.Reset();
+ {
+ auto cb = sync.AsStatusCallback();
+ auto waiter = thread([sync] {
+ ignore_result(sync.Wait());
+ });
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ cb.Run(Status::OK());
+ waiter.join();
+ }
+ sync.Reset();
+ {
+ auto cb = sync.AsStdStatusCallback();
+ auto waiter = thread([sync] {
+ ignore_result(sync.Wait());
+ });
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ cb(Status::OK());
+ waiter.join();
+ }
+}
+
+TEST_F(AsyncUtilTest, TestSynchronizerMultiWait) {
+ Synchronizer sync;
+ vector<thread> waiters;
+ for (int i = 0; i < 5; i++) {
+ waiters.emplace_back([sync] {
+ ignore_result(sync.Wait());
+ });
+ }
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ sync.StatusCB(Status::OK());
+
+ for (auto& waiter : waiters) {
+ waiter.join();
+ }
+}
+
+TEST_F(AsyncUtilTest, TestSynchronizerTimedWait) {
+ thread waiter;
+ {
+ Synchronizer sync;
+ auto cb = sync.AsStatusCallback();
+ waiter = thread([cb] {
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ cb.Run(Status::OK());
+ });
+ ASSERT_OK(sync.WaitFor(MonoDelta::FromMilliseconds(1000)));
+ }
+ waiter.join();
+
+ {
+ Synchronizer sync;
+ auto cb = sync.AsStatusCallback();
+ waiter = thread([cb] {
+ SleepFor(MonoDelta::FromMilliseconds(1000));
+ cb.Run(Status::OK());
+ });
+ ASSERT_TRUE(sync.WaitFor(MonoDelta::FromMilliseconds(5)).IsTimedOut());
+ }
+
+ // Waiting on the thread gives TSAN to check that no thread safety issues
+ // occurred.
+ waiter.join();
+}
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/async_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/async_util.h b/be/src/kudu/util/async_util.h
new file mode 100644
index 0000000..338c6c2
--- /dev/null
+++ b/be/src/kudu/util/async_util.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility functions which are handy when doing async/callback-based programming.
+
+#pragma once
+
+#include <functional>
+#include <memory>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+#include "kudu/util/status_callback.h"
+
+namespace kudu {
+
+// Simple class which can be used to make async methods synchronous.
+// For example:
+// Synchronizer s;
+// SomeAsyncMethod(s.AsStatusCallback());
+// CHECK_OK(s.Wait());
+//
+// The lifetime of the synchronizer is decoupled from the callback it produces.
+// If the callback outlives the synchronizer then executing it will be a no-op.
+// Callers must be careful not to allow the callback to be destructed without
+// completing it, otherwise the thread waiting on the synchronizer will block
+// indefinitely.
+class Synchronizer {
+ public:
+ Synchronizer()
+ : data_(std::make_shared<Data>()) {
+ }
+
+ void StatusCB(const Status& status) {
+ Data::Callback(std::weak_ptr<Data>(data_), status);
+ }
+
+ StatusCallback AsStatusCallback() {
+ return Bind(Data::Callback, std::weak_ptr<Data>(data_));
+ }
+
+ StdStatusCallback AsStdStatusCallback() {
+ return std::bind(Data::Callback, std::weak_ptr<Data>(data_), std::placeholders::_1);
+ }
+
+ Status Wait() const {
+ data_->latch.Wait();
+ return data_->status;
+ }
+
+ Status WaitFor(const MonoDelta& delta) const {
+ if (PREDICT_FALSE(!data_->latch.WaitFor(delta))) {
+ return Status::TimedOut("timed out while waiting for the callback to be called");
+ }
+ return data_->status;
+ }
+
+ void Reset() {
+ data_->latch.Reset(1);
+ }
+
+ private:
+
+ struct Data {
+ Data() : latch(1) {
+ }
+
+ static void Callback(std::weak_ptr<Data> weak, const Status& status) {
+ auto ptr = weak.lock();
+ if (ptr) {
+ ptr->status = status;
+ ptr->latch.CountDown();
+ }
+ }
+
+ Status status;
+ CountDownLatch latch;
+ DISALLOW_COPY_AND_ASSIGN(Data);
+ };
+
+ std::shared_ptr<Data> data_;
+};
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/atomic-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/atomic-test.cc b/be/src/kudu/util/atomic-test.cc
new file mode 100644
index 0000000..a65d55d
--- /dev/null
+++ b/be/src/kudu/util/atomic-test.cc
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/atomic.h"
+
+#include <cstdint>
+#include <limits>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+using std::numeric_limits;
+using std::vector;
+
+// TODO Add some multi-threaded tests; currently AtomicInt is just a
+// wrapper around 'atomicops.h', but should the underlying
+// implemention change, it would help to have tests that make sure
+// invariants are preserved in a multi-threaded environment.
+
+template<typename T>
+class AtomicIntTest : public KuduTest {
+ public:
+
+ AtomicIntTest()
+ : max_(numeric_limits<T>::max()),
+ min_(numeric_limits<T>::min()) {
+ acquire_release_ = { kMemOrderNoBarrier, kMemOrderAcquire, kMemOrderRelease };
+ barrier_ = { kMemOrderNoBarrier, kMemOrderBarrier };
+ }
+
+ vector<MemoryOrder> acquire_release_;
+ vector<MemoryOrder> barrier_;
+
+ T max_;
+ T min_;
+};
+
+typedef ::testing::Types<int32_t, int64_t, uint32_t, uint64_t> IntTypes;
+TYPED_TEST_CASE(AtomicIntTest, IntTypes);
+
+TYPED_TEST(AtomicIntTest, LoadStore) {
+ for (const MemoryOrder mem_order : this->acquire_release_) {
+ AtomicInt<TypeParam> i(0);
+ EXPECT_EQ(0, i.Load(mem_order));
+ i.Store(42, mem_order);
+ EXPECT_EQ(42, i.Load(mem_order));
+ i.Store(this->min_, mem_order);
+ EXPECT_EQ(this->min_, i.Load(mem_order));
+ i.Store(this->max_, mem_order);
+ EXPECT_EQ(this->max_, i.Load(mem_order));
+ }
+}
+
+TYPED_TEST(AtomicIntTest, SetSwapExchange) {
+ for (const MemoryOrder mem_order : this->acquire_release_) {
+ AtomicInt<TypeParam> i(0);
+ EXPECT_TRUE(i.CompareAndSet(0, 5, mem_order));
+ EXPECT_EQ(5, i.Load(mem_order));
+ EXPECT_FALSE(i.CompareAndSet(0, 10, mem_order));
+
+ EXPECT_EQ(5, i.CompareAndSwap(5, this->max_, mem_order));
+ EXPECT_EQ(this->max_, i.CompareAndSwap(42, 42, mem_order));
+ EXPECT_EQ(this->max_, i.CompareAndSwap(this->max_, this->min_, mem_order));
+
+ EXPECT_EQ(this->min_, i.Exchange(this->max_, mem_order));
+ EXPECT_EQ(this->max_, i.Load(mem_order));
+ }
+}
+
+TYPED_TEST(AtomicIntTest, MinMax) {
+ for (const MemoryOrder mem_order : this->acquire_release_) {
+ AtomicInt<TypeParam> i(0);
+
+ i.StoreMax(100, mem_order);
+ EXPECT_EQ(100, i.Load(mem_order));
+ i.StoreMin(50, mem_order);
+ EXPECT_EQ(50, i.Load(mem_order));
+
+ i.StoreMax(25, mem_order);
+ EXPECT_EQ(50, i.Load(mem_order));
+ i.StoreMin(75, mem_order);
+ EXPECT_EQ(50, i.Load(mem_order));
+
+ i.StoreMax(this->max_, mem_order);
+ EXPECT_EQ(this->max_, i.Load(mem_order));
+ i.StoreMin(this->min_, mem_order);
+ EXPECT_EQ(this->min_, i.Load(mem_order));
+ }
+}
+
+TYPED_TEST(AtomicIntTest, Increment) {
+ for (const MemoryOrder mem_order : this->barrier_) {
+ AtomicInt<TypeParam> i(0);
+ EXPECT_EQ(1, i.Increment(mem_order));
+ EXPECT_EQ(3, i.IncrementBy(2, mem_order));
+ EXPECT_EQ(3, i.IncrementBy(0, mem_order));
+ }
+}
+
+TEST(Atomic, AtomicBool) {
+ vector<MemoryOrder> memory_orders = { kMemOrderNoBarrier, kMemOrderRelease, kMemOrderAcquire };
+ for (const MemoryOrder mem_order : memory_orders) {
+ AtomicBool b(false);
+ EXPECT_FALSE(b.Load(mem_order));
+ b.Store(true, mem_order);
+ EXPECT_TRUE(b.Load(mem_order));
+ EXPECT_TRUE(b.CompareAndSet(true, false, mem_order));
+ EXPECT_FALSE(b.Load(mem_order));
+ EXPECT_FALSE(b.CompareAndSet(true, false, mem_order));
+ EXPECT_FALSE(b.CompareAndSwap(false, true, mem_order));
+ EXPECT_TRUE(b.Load(mem_order));
+ EXPECT_TRUE(b.Exchange(false, mem_order));
+ EXPECT_FALSE(b.Load(mem_order));
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/atomic.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/atomic.cc b/be/src/kudu/util/atomic.cc
new file mode 100644
index 0000000..430631f
--- /dev/null
+++ b/be/src/kudu/util/atomic.cc
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/atomic.h"
+
+#include <cstdint>
+#include <ostream>
+
+#include <glog/logging.h>
+
+namespace kudu {
+
+template<typename T>
+AtomicInt<T>::AtomicInt(T initial_value) {
+ Store(initial_value, kMemOrderNoBarrier);
+}
+
+template<typename T>
+void AtomicInt<T>::FatalMemOrderNotSupported(const char* caller,
+ const char* requested,
+ const char* supported) {
+ LOG(FATAL) << caller << " does not support " << requested << ": only "
+ << supported << " are supported.";
+}
+
+template
+class AtomicInt<int32_t>;
+
+template
+class AtomicInt<int64_t>;
+
+template
+class AtomicInt<uint32_t>;
+
+template
+class AtomicInt<uint64_t>;
+
+AtomicBool::AtomicBool(bool value)
+ : underlying_(value) {
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/atomic.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/atomic.h b/be/src/kudu/util/atomic.h
new file mode 100644
index 0000000..3051a41
--- /dev/null
+++ b/be/src/kudu/util/atomic.h
@@ -0,0 +1,322 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_UTIL_ATOMIC_H
+#define KUDU_UTIL_ATOMIC_H
+
+#include <algorithm> // IWYU pragma: keep
+#include <cstdint>
+#include <cstdlib>
+#include <type_traits>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+// See top-level comments in kudu/gutil/atomicops.h for further
+// explanations of these levels.
+enum MemoryOrder {
+ // Relaxed memory ordering, doesn't use any barriers.
+ kMemOrderNoBarrier = 0,
+
+ // Ensures that no later memory access by the same thread can be
+ // reordered ahead of the operation.
+ kMemOrderAcquire = 1,
+
+ // Ensures that no previous memory access by the same thread can be
+ // reordered after the operation.
+ kMemOrderRelease = 2,
+
+ // Ensures that neither previous NOR later memory access by the same
+ // thread can be reordered after the operation.
+ kMemOrderBarrier = 3,
+};
+
+// Atomic integer class inspired by Impala's AtomicInt and
+// std::atomic<> in C++11.
+//
+// NOTE: All of public operations use an implicit memory order of
+// kMemOrderNoBarrier unless otherwise specified.
+//
+// Unlike std::atomic<>, overflowing an unsigned AtomicInt via Increment or
+// IncrementBy is undefined behavior (it is also undefined for signed types,
+// as always).
+//
+// See also: kudu/gutil/atomicops.h
+template<typename T>
+class AtomicInt {
+ public:
+ // Initialize the underlying value to 'initial_value'. The
+ // initialization performs a Store with 'kMemOrderNoBarrier'.
+ explicit AtomicInt(T initial_value);
+
+ // Returns the underlying value.
+ //
+ // Does not support 'kMemOrderBarrier'.
+ T Load(MemoryOrder mem_order = kMemOrderNoBarrier) const;
+
+ // Sets the underlying value to 'new_value'.
+ //
+ // Does not support 'kMemOrderBarrier'.
+ void Store(T new_value, MemoryOrder mem_order = kMemOrderNoBarrier);
+
+ // Iff the underlying value is equal to 'expected_val', sets the
+ // underlying value to 'new_value' and returns true; returns false
+ // otherwise.
+ //
+ // Does not support 'kMemOrderBarrier'.
+ bool CompareAndSet(T expected_val, T new_value, MemoryOrder mem_order = kMemOrderNoBarrier);
+
+ // Iff the underlying value is equal to 'expected_val', sets the
+ // underlying value to 'new_value' and returns
+ // 'expected_val'. Otherwise, returns the current underlying
+ // value.
+ //
+ // Does not support 'kMemOrderBarrier'.
+ T CompareAndSwap(T expected_val, T new_value, MemoryOrder mem_order = kMemOrderNoBarrier);
+
+ // Sets the underlying value to 'new_value' iff 'new_value' is
+ // greater than the current underlying value.
+ //
+ // Does not support 'kMemOrderBarrier'.
+ void StoreMax(T new_value, MemoryOrder mem_order = kMemOrderNoBarrier);
+
+ // Sets the underlying value to 'new_value' iff 'new_value' is less
+ // than the current underlying value.
+ //
+ // Does not support 'kMemOrderBarrier'.
+ void StoreMin(T new_value, MemoryOrder mem_order = kMemOrderNoBarrier);
+
+ // Increments the underlying value by 1 and returns the new
+ // underlying value.
+ //
+ // Does not support 'kMemOrderAcquire' or 'kMemOrderRelease'.
+ T Increment(MemoryOrder mem_order = kMemOrderNoBarrier);
+
+ // Increments the underlying value by 'delta' and returns the new
+ // underlying value.
+
+ // Does not support 'kKemOrderAcquire' or 'kMemOrderRelease'.
+ T IncrementBy(T delta, MemoryOrder mem_order = kMemOrderNoBarrier);
+
+ // Sets the underlying value to 'new_value' and returns the previous
+ // underlying value.
+ //
+ // Does not support 'kMemOrderBarrier'.
+ T Exchange(T new_value, MemoryOrder mem_order = kMemOrderNoBarrier);
+
+ private:
+ // If a method 'caller' doesn't support memory order described as
+ // 'requested', exit by doing perform LOG(FATAL) logging the method
+ // called, the requested memory order, and the supported memory
+ // orders.
+ static void FatalMemOrderNotSupported(const char* caller,
+ const char* requested = "kMemOrderBarrier",
+ const char* supported =
+ "kMemNorderNoBarrier, kMemOrderAcquire, kMemOrderRelease");
+
+ // The gutil/atomicops.h functions only operate on signed types.
+ // So, even if the user specializes on an unsigned type, we use a
+ // signed type internally.
+ typedef typename std::make_signed<T>::type SignedT;
+ SignedT value_;
+
+ DISALLOW_COPY_AND_ASSIGN(AtomicInt);
+};
+
+// Adapts AtomicInt to handle boolean values.
+//
+// NOTE: All of public operations use an implicit memory order of
+// kMemOrderNoBarrier unless otherwise specified.
+//
+// See AtomicInt above for documentation on individual methods.
+class AtomicBool {
+ public:
+ explicit AtomicBool(bool value);
+
+ bool Load(MemoryOrder m = kMemOrderNoBarrier) const {
+ return underlying_.Load(m);
+ }
+ void Store(bool n, MemoryOrder m = kMemOrderNoBarrier) {
+ underlying_.Store(static_cast<int32_t>(n), m);
+ }
+ bool CompareAndSet(bool e, bool n, MemoryOrder m = kMemOrderNoBarrier) {
+ return underlying_.CompareAndSet(static_cast<int32_t>(e), static_cast<int32_t>(n), m);
+ }
+ bool CompareAndSwap(bool e, bool n, MemoryOrder m = kMemOrderNoBarrier) {
+ return underlying_.CompareAndSwap(static_cast<int32_t>(e), static_cast<int32_t>(n), m);
+ }
+ bool Exchange(bool n, MemoryOrder m = kMemOrderNoBarrier) {
+ return underlying_.Exchange(static_cast<int32_t>(n), m);
+ }
+ private:
+ AtomicInt<int32_t> underlying_;
+
+ DISALLOW_COPY_AND_ASSIGN(AtomicBool);
+};
+
+template<typename T>
+inline T AtomicInt<T>::Load(MemoryOrder mem_order) const {
+ switch (mem_order) {
+ case kMemOrderNoBarrier: {
+ return base::subtle::NoBarrier_Load(&value_);
+ }
+ case kMemOrderBarrier: {
+ FatalMemOrderNotSupported("Load");
+ break;
+ }
+ case kMemOrderAcquire: {
+ return base::subtle::Acquire_Load(&value_);
+ }
+ case kMemOrderRelease: {
+ return base::subtle::Release_Load(&value_);
+ }
+ }
+ abort(); // Unnecessary, but avoids gcc complaining.
+}
+
+template<typename T>
+inline void AtomicInt<T>::Store(T new_value, MemoryOrder mem_order) {
+ switch (mem_order) {
+ case kMemOrderNoBarrier: {
+ base::subtle::NoBarrier_Store(&value_, new_value);
+ break;
+ }
+ case kMemOrderBarrier: {
+ FatalMemOrderNotSupported("Store");
+ break;
+ }
+ case kMemOrderAcquire: {
+ base::subtle::Acquire_Store(&value_, new_value);
+ break;
+ }
+ case kMemOrderRelease: {
+ base::subtle::Release_Store(&value_, new_value);
+ break;
+ }
+ }
+}
+
+template<typename T>
+inline bool AtomicInt<T>::CompareAndSet(T expected_val, T new_val, MemoryOrder mem_order) {
+ return CompareAndSwap(expected_val, new_val, mem_order) == expected_val;
+}
+
+template<typename T>
+inline T AtomicInt<T>::CompareAndSwap(T expected_val, T new_val, MemoryOrder mem_order) {
+ switch (mem_order) {
+ case kMemOrderNoBarrier: {
+ return base::subtle::NoBarrier_CompareAndSwap(
+ &value_, expected_val, new_val);
+ }
+ case kMemOrderBarrier: {
+ FatalMemOrderNotSupported("CompareAndSwap/CompareAndSet");
+ break;
+ }
+ case kMemOrderAcquire: {
+ return base::subtle::Acquire_CompareAndSwap(
+ &value_, expected_val, new_val);
+ }
+ case kMemOrderRelease: {
+ return base::subtle::Release_CompareAndSwap(
+ &value_, expected_val, new_val);
+ }
+ }
+ abort();
+}
+
+
+template<typename T>
+inline T AtomicInt<T>::Increment(MemoryOrder mem_order) {
+ return IncrementBy(1, mem_order);
+}
+
+template<typename T>
+inline T AtomicInt<T>::IncrementBy(T delta, MemoryOrder mem_order) {
+ switch (mem_order) {
+ case kMemOrderNoBarrier: {
+ return base::subtle::NoBarrier_AtomicIncrement(&value_, delta);
+ }
+ case kMemOrderBarrier: {
+ return base::subtle::Barrier_AtomicIncrement(&value_, delta);
+ }
+ case kMemOrderAcquire: {
+ FatalMemOrderNotSupported("Increment/IncrementBy",
+ "kMemOrderAcquire",
+ "kMemOrderNoBarrier and kMemOrderBarrier");
+ break;
+ }
+ case kMemOrderRelease: {
+ FatalMemOrderNotSupported("Increment/Incrementby",
+ "kMemOrderAcquire",
+ "kMemOrderNoBarrier and kMemOrderBarrier");
+ break;
+ }
+ }
+ abort();
+}
+
+template<typename T>
+inline T AtomicInt<T>::Exchange(T new_value, MemoryOrder mem_order) {
+ switch (mem_order) {
+ case kMemOrderNoBarrier: {
+ return base::subtle::NoBarrier_AtomicExchange(&value_, new_value);
+ }
+ case kMemOrderBarrier: {
+ FatalMemOrderNotSupported("Exchange");
+ break;
+ }
+ case kMemOrderAcquire: {
+ return base::subtle::Acquire_AtomicExchange(&value_, new_value);
+ }
+ case kMemOrderRelease: {
+ return base::subtle::Release_AtomicExchange(&value_, new_value);
+ }
+ }
+ abort();
+}
+
+template<typename T>
+inline void AtomicInt<T>::StoreMax(T new_value, MemoryOrder mem_order) {
+ T old_value = Load(mem_order);
+ while (true) {
+ T max_value = std::max(old_value, new_value);
+ T prev_value = CompareAndSwap(old_value, max_value, mem_order);
+ if (PREDICT_TRUE(old_value == prev_value)) {
+ break;
+ }
+ old_value = prev_value;
+ }
+}
+
+template<typename T>
+inline void AtomicInt<T>::StoreMin(T new_value, MemoryOrder mem_order) {
+ T old_value = Load(mem_order);
+ while (true) {
+ T min_value = std::min(old_value, new_value);
+ T prev_value = CompareAndSwap(old_value, min_value, mem_order);
+ if (PREDICT_TRUE(old_value == prev_value)) {
+ break;
+ }
+ old_value = prev_value;
+ }
+}
+
+} // namespace kudu
+#endif /* KUDU_UTIL_ATOMIC_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/auto_release_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/auto_release_pool.h b/be/src/kudu/util/auto_release_pool.h
new file mode 100644
index 0000000..eaed9c2
--- /dev/null
+++ b/be/src/kudu/util/auto_release_pool.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Simple pool of objects that will be deallocated when the pool is
+// destroyed
+
+#ifndef KUDU_UTIL_AUTO_RELEASE_POOL_H
+#define KUDU_UTIL_AUTO_RELEASE_POOL_H
+
+#include <vector>
+
+#include "kudu/gutil/spinlock.h"
+
+namespace kudu {
+
+// Thread-safe.
+class AutoReleasePool {
+ public:
+ AutoReleasePool(): objects_() { }
+
+ ~AutoReleasePool() {
+ for (auto& object : objects_) {
+ delete object;
+ }
+ }
+
+ template <class T>
+ T *Add(T *t) {
+ base::SpinLockHolder l(&lock_);
+ objects_.push_back(new SpecificElement<T>(t));
+ return t;
+ }
+
+ // Add an array-allocated object to the pool. This is identical to
+ // Add() except that it will be freed with 'delete[]' instead of 'delete'.
+ template<class T>
+ T* AddArray(T *t) {
+ base::SpinLockHolder l(&lock_);
+ objects_.push_back(new SpecificArrayElement<T>(t));
+ return t;
+ }
+
+ // Donate all objects in this pool to another pool.
+ void DonateAllTo(AutoReleasePool* dst) {
+ base::SpinLockHolder l(&lock_);
+ base::SpinLockHolder l_them(&dst->lock_);
+
+ dst->objects_.reserve(dst->objects_.size() + objects_.size());
+ dst->objects_.insert(dst->objects_.end(), objects_.begin(), objects_.end());
+ objects_.clear();
+ }
+
+ private:
+ struct GenericElement {
+ virtual ~GenericElement() {}
+ };
+
+ template <class T>
+ struct SpecificElement : GenericElement {
+ explicit SpecificElement(T *t): t(t) {}
+ ~SpecificElement() {
+ delete t;
+ }
+
+ T *t;
+ };
+
+ template <class T>
+ struct SpecificArrayElement : GenericElement {
+ explicit SpecificArrayElement(T *t): t(t) {}
+ ~SpecificArrayElement() {
+ delete [] t;
+ }
+
+ T *t;
+ };
+
+ typedef std::vector<GenericElement *> ElementVector;
+ ElementVector objects_;
+ base::SpinLock lock_;
+};
+
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/barrier.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/barrier.h b/be/src/kudu/util/barrier.h
new file mode 100644
index 0000000..88e5682
--- /dev/null
+++ b/be/src/kudu/util/barrier.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/thread_restrictions.h"
+
+namespace kudu {
+
+// Implementation of pthread-style Barriers.
+class Barrier {
+ public:
+ // Initialize the barrier with the given initial count.
+ explicit Barrier(int count) :
+ cond_(&mutex_),
+ count_(count),
+ initial_count_(count) {
+ DCHECK_GT(count, 0);
+ }
+
+ ~Barrier() {
+ }
+
+ // Wait until all threads have reached the barrier.
+ // Once all threads have reached the barrier, the barrier is reset
+ // to the initial count.
+ void Wait() {
+ ThreadRestrictions::AssertWaitAllowed();
+ MutexLock l(mutex_);
+ if (--count_ == 0) {
+ count_ = initial_count_;
+ cycle_count_++;
+ cond_.Broadcast();
+ return;
+ }
+
+ int initial_cycle = cycle_count_;
+ while (cycle_count_ == initial_cycle) {
+ cond_.Wait();
+ }
+ }
+
+ private:
+ Mutex mutex_;
+ ConditionVariable cond_;
+ int count_;
+ uint32_t cycle_count_ = 0;
+ const int initial_count_;
+ DISALLOW_COPY_AND_ASSIGN(Barrier);
+};
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/bit-stream-utils.h b/be/src/kudu/util/bit-stream-utils.h
new file mode 100644
index 0000000..c6aeb01
--- /dev/null
+++ b/be/src/kudu/util/bit-stream-utils.h
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef IMPALA_UTIL_BIT_STREAM_UTILS_H
+#define IMPALA_UTIL_BIT_STREAM_UTILS_H
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/bit-util.h"
+#include "kudu/util/faststring.h"
+
+namespace kudu {
+
+// Utility class to write bit/byte streams. This class can write data to either be
+// bit packed or byte aligned (and a single stream that has a mix of both).
+class BitWriter {
+ public:
+ // buffer: buffer to write bits to.
+ explicit BitWriter(faststring *buffer)
+ : buffer_(buffer) {
+ Clear();
+ }
+
+ void Clear() {
+ buffered_values_ = 0;
+ byte_offset_ = 0;
+ bit_offset_ = 0;
+ buffer_->clear();
+ }
+
+ // Returns a pointer to the underlying buffer
+ faststring *buffer() const { return buffer_; }
+
+ // The number of current bytes written, including the current byte (i.e. may include a
+ // fraction of a byte). Includes buffered values.
+ int bytes_written() const { return byte_offset_ + BitUtil::Ceil(bit_offset_, 8); }
+
+ // Writes a value to buffered_values_, flushing to buffer_ if necessary. This is bit
+ // packed. num_bits must be <= 32. If 'v' is larger than 'num_bits' bits, the higher
+ // bits are ignored.
+ void PutValue(uint64_t v, int num_bits);
+
+ // Writes v to the next aligned byte using num_bits. If T is larger than num_bits, the
+ // extra high-order bits will be ignored.
+ template<typename T>
+ void PutAligned(T v, int num_bits);
+
+ // Write a Vlq encoded int to the buffer. The value is written byte aligned.
+ // For more details on vlq: en.wikipedia.org/wiki/Variable-length_quantity
+ void PutVlqInt(int32_t v);
+
+ // Get the index to the next aligned byte and advance the underlying buffer by num_bytes.
+ size_t GetByteIndexAndAdvance(int num_bytes) {
+ uint8_t* ptr = GetNextBytePtr(num_bytes);
+ return ptr - buffer_->data();
+ }
+
+ // Get a pointer to the next aligned byte and advance the underlying buffer by num_bytes.
+ uint8_t* GetNextBytePtr(int num_bytes);
+
+ // Flushes all buffered values to the buffer. Call this when done writing to the buffer.
+ // If 'align' is true, buffered_values_ is reset and any future writes will be written
+ // to the next byte boundary.
+ void Flush(bool align = false);
+
+ private:
+ // Bit-packed values are initially written to this variable before being memcpy'd to
+ // buffer_. This is faster than writing values byte by byte directly to buffer_.
+ uint64_t buffered_values_;
+
+ faststring *buffer_;
+ int byte_offset_; // Offset in buffer_
+ int bit_offset_; // Offset in buffered_values_
+};
+
+// Utility class to read bit/byte stream. This class can read bits or bytes
+// that are either byte aligned or not. It also has utilities to read multiple
+// bytes in one read (e.g. encoded int).
+class BitReader {
+ public:
+ // 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'.
+ BitReader(const uint8_t* buffer, int buffer_len);
+
+ BitReader() : buffer_(NULL), max_bytes_(0) {}
+
+ // Gets the next value from the buffer. Returns true if 'v' could be read or false if
+ // there are not enough bytes left. num_bits must be <= 32.
+ template<typename T>
+ bool GetValue(int num_bits, T* v);
+
+ // Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a
+ // little-endian native type and big enough to store 'num_bytes'. The value is assumed
+ // to be byte-aligned so the stream will be advanced to the start of the next byte
+ // before 'v' is read. Returns false if there are not enough bytes left.
+ template<typename T>
+ bool GetAligned(int num_bytes, T* v);
+
+ // Reads a vlq encoded int from the stream. The encoded int must start at the
+ // beginning of a byte. Return false if there were not enough bytes in the buffer.
+ bool GetVlqInt(int32_t* v);
+
+ // Returns the number of bytes left in the stream, not including the current byte (i.e.,
+ // there may be an additional fraction of a byte).
+ int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); }
+
+ // Current position in the stream, by bit.
+ int position() const { return byte_offset_ * 8 + bit_offset_; }
+
+ // Rewind the stream by 'num_bits' bits
+ void Rewind(int num_bits);
+
+ // Seek to a specific bit in the buffer
+ void SeekToBit(uint stream_position);
+
+ // Maximum byte length of a vlq encoded int
+ static const int MAX_VLQ_BYTE_LEN = 5;
+
+ bool is_initialized() const { return buffer_ != NULL; }
+
+ private:
+ // Used by SeekToBit() and GetValue() to fetch the
+ // the next word into buffer_.
+ void BufferValues();
+
+ const uint8_t* buffer_;
+ int max_bytes_;
+
+ // Bytes are memcpy'd from buffer_ and values are read from this variable. This is
+ // faster than reading values byte by byte directly from buffer_.
+ uint64_t buffered_values_;
+
+ int byte_offset_; // Offset in buffer_
+ int bit_offset_; // Offset in buffered_values_
+};
+
+} // namespace kudu
+
+#endif