You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:30 UTC
[18/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/interval_tree-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/interval_tree-test.cc b/be/src/kudu/util/interval_tree-test.cc
new file mode 100644
index 0000000..df143d3
--- /dev/null
+++ b/be/src/kudu/util/interval_tree-test.cc
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// All rights reserved.
+
+#include <algorithm>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <tuple> // IWYU pragma: keep
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/interval_tree.h"
+#include "kudu/util/interval_tree-inl.h"
+#include "kudu/util/test_util.h"
+
+using std::pair;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+// Test harness.
+class TestIntervalTree : public KuduTest {
+};
+
+// Simple interval class for integer intervals.
+struct IntInterval {
+ IntInterval(int left, int right, int id = -1)
+ : left(left),
+ right(right),
+ id(id) {
+ }
+
+ bool Intersects(const IntInterval &other) const {
+ if (other.left > right) return false;
+ if (left > other.right) return false;
+ return true;
+ }
+
+ string ToString() const {
+ return strings::Substitute("[$0, $1]($2) ", left, right, id);
+ }
+
+ int left, right, id;
+};
+
+// A wrapper around an int which can be compared with IntTraits::compare()
+// but also can keep a counter of how many times it has been compared. Used
+// for TestBigO below.
+struct CountingQueryPoint {
+ explicit CountingQueryPoint(int v)
+ : val(v),
+ count(new int(0)) {
+ }
+
+ int val;
+ std::shared_ptr<int> count;
+};
+
+// Traits definition for intervals made up of ints on either end.
+struct IntTraits {
+ typedef int point_type;
+ typedef IntInterval interval_type;
+ static point_type get_left(const IntInterval &x) {
+ return x.left;
+ }
+ static point_type get_right(const IntInterval &x) {
+ return x.right;
+ }
+ static int compare(int a, int b) {
+ if (a < b) return -1;
+ if (a > b) return 1;
+ return 0;
+ }
+
+ static int compare(const CountingQueryPoint& q, int b) {
+ (*q.count)++;
+ return compare(q.val, b);
+ }
+ static int compare(int a, const CountingQueryPoint& b) {
+ return -compare(b, a);
+ }
+
+};
+
+// Compare intervals in an arbitrary but consistent way - this is only
+// used for verifying that the two algorithms come up with the same results.
+// It's not necessary to define this to use an interval tree.
+static bool CompareIntervals(const IntInterval &a, const IntInterval &b) {
+ return std::make_tuple(a.left, a.right, a.id) <
+ std::make_tuple(b.left, b.right, b.id);
+}
+
+// Stringify a list of int intervals, for easy test error reporting.
+static string Stringify(const vector<IntInterval> &intervals) {
+ string ret;
+ bool first = true;
+ for (const IntInterval &interval : intervals) {
+ if (!first) {
+ ret.append(",");
+ }
+ ret.append(interval.ToString());
+ }
+ return ret;
+}
+
+// Find any intervals in 'intervals' which contain 'query_point' by brute force.
+static void FindContainingBruteForce(const vector<IntInterval> &intervals,
+ int query_point,
+ vector<IntInterval> *results) {
+ for (const IntInterval &i : intervals) {
+ if (query_point >= i.left && query_point <= i.right) {
+ results->push_back(i);
+ }
+ }
+}
+
+
+// Find any intervals in 'intervals' which intersect 'query_interval' by brute force.
+static void FindIntersectingBruteForce(const vector<IntInterval> &intervals,
+ IntInterval query_interval,
+ vector<IntInterval> *results) {
+ for (const IntInterval &i : intervals) {
+ if (query_interval.Intersects(i)) {
+ results->push_back(i);
+ }
+ }
+}
+
+
+// Verify that IntervalTree::FindContainingPoint yields the same results as the naive
+// brute-force O(n) algorithm.
+static void VerifyFindContainingPoint(const vector<IntInterval> all_intervals,
+ const IntervalTree<IntTraits> &tree,
+ int query_point) {
+ vector<IntInterval> results;
+ tree.FindContainingPoint(query_point, &results);
+ std::sort(results.begin(), results.end(), CompareIntervals);
+
+ vector<IntInterval> brute_force;
+ FindContainingBruteForce(all_intervals, query_point, &brute_force);
+ std::sort(brute_force.begin(), brute_force.end(), CompareIntervals);
+
+ SCOPED_TRACE(Stringify(all_intervals) + StringPrintf(" (q=%d)", query_point));
+ EXPECT_EQ(Stringify(brute_force), Stringify(results));
+}
+
+// Verify that IntervalTree::FindIntersectingInterval yields the same results as the naive
+// brute-force O(n) algorithm.
+static void VerifyFindIntersectingInterval(const vector<IntInterval> all_intervals,
+ const IntervalTree<IntTraits> &tree,
+ const IntInterval &query_interval) {
+ vector<IntInterval> results;
+ tree.FindIntersectingInterval(query_interval, &results);
+ std::sort(results.begin(), results.end(), CompareIntervals);
+
+ vector<IntInterval> brute_force;
+ FindIntersectingBruteForce(all_intervals, query_interval, &brute_force);
+ std::sort(brute_force.begin(), brute_force.end(), CompareIntervals);
+
+ SCOPED_TRACE(Stringify(all_intervals) +
+ StringPrintf(" (q=[%d,%d])", query_interval.left, query_interval.right));
+ EXPECT_EQ(Stringify(brute_force), Stringify(results));
+}
+
+static vector<IntInterval> CreateRandomIntervals(int n = 100) {
+ vector<IntInterval> intervals;
+ for (int i = 0; i < n; i++) {
+ int l = rand() % 100; // NOLINT(runtime/threadsafe_fn)
+ int r = l + rand() % 20; // NOLINT(runtime/threadsafe_fn)
+ intervals.emplace_back(l, r, i);
+ }
+ return intervals;
+}
+
+TEST_F(TestIntervalTree, TestBasic) {
+ vector<IntInterval> intervals;
+ intervals.emplace_back(1, 2, 1);
+ intervals.emplace_back(3, 4, 2);
+ intervals.emplace_back(1, 4, 3);
+ IntervalTree<IntTraits> t(intervals);
+
+ for (int i = 0; i <= 5; i++) {
+ VerifyFindContainingPoint(intervals, t, i);
+
+ for (int j = i; j <= 5; j++) {
+ VerifyFindIntersectingInterval(intervals, t, IntInterval(i, j, 0));
+ }
+ }
+}
+
+TEST_F(TestIntervalTree, TestRandomized) {
+ SeedRandom();
+
+ // Generate 100 random intervals spanning 0-200 and build an interval tree from them.
+ vector<IntInterval> intervals = CreateRandomIntervals();
+ IntervalTree<IntTraits> t(intervals);
+
+ // Test that we get the correct result on every possible query.
+ for (int i = -1; i < 201; i++) {
+ VerifyFindContainingPoint(intervals, t, i);
+ }
+
+ // Test that we get the correct result for random intervals
+ for (int i = 0; i < 100; i++) {
+ int l = rand() % 100; // NOLINT(runtime/threadsafe_fn)
+ int r = l + rand() % 100; // NOLINT(runtime/threadsafe_fn)
+ VerifyFindIntersectingInterval(intervals, t, IntInterval(l, r));
+ }
+}
+
+TEST_F(TestIntervalTree, TestEmpty) {
+ vector<IntInterval> empty;
+ IntervalTree<IntTraits> t(empty);
+
+ VerifyFindContainingPoint(empty, t, 1);
+ VerifyFindIntersectingInterval(empty, t, IntInterval(1, 2, 0));
+}
+
+TEST_F(TestIntervalTree, TestBigO) {
+#ifndef NDEBUG
+ LOG(WARNING) << "big-O results are not valid if DCHECK is enabled";
+ return;
+#endif
+ SeedRandom();
+
+ LOG(INFO) << "num_int\tnum_q\tresults\tsimple\tbatch";
+ for (int num_intervals = 1; num_intervals < 2000; num_intervals *= 2) {
+ vector<IntInterval> intervals = CreateRandomIntervals(num_intervals);
+ IntervalTree<IntTraits> t(intervals);
+ for (int num_queries = 1; num_queries < 2000; num_queries *= 2) {
+ vector<CountingQueryPoint> queries;
+ for (int i = 0; i < num_queries; i++) {
+ queries.emplace_back(rand() % 100);
+ }
+ std::sort(queries.begin(), queries.end(),
+ [](const CountingQueryPoint& a,
+ const CountingQueryPoint& b) {
+ return a.val < b.val;
+ });
+
+ // Test using batch algorithm.
+ int num_results_batch = 0;
+ t.ForEachIntervalContainingPoints(
+ queries,
+ [&](CountingQueryPoint query_point, const IntInterval& interval) {
+ num_results_batch++;
+ });
+ int num_comparisons_batch = 0;
+ for (const auto& q : queries) {
+ num_comparisons_batch += *q.count;
+ *q.count = 0;
+ }
+
+ // Test using one-by-one queries.
+ int num_results_simple = 0;
+ for (auto& q : queries) {
+ vector<IntInterval> intervals;
+ t.FindContainingPoint(q, &intervals);
+ num_results_simple += intervals.size();
+ }
+ int num_comparisons_simple = 0;
+ for (const auto& q : queries) {
+ num_comparisons_simple += *q.count;
+ }
+ ASSERT_EQ(num_results_simple, num_results_batch);
+
+ LOG(INFO) << num_intervals << "\t" << num_queries << "\t" << num_results_simple << "\t"
+ << num_comparisons_simple << "\t" << num_comparisons_batch;
+ }
+ }
+}
+
+TEST_F(TestIntervalTree, TestMultiQuery) {
+ SeedRandom();
+ const int kNumQueries = 1;
+ vector<IntInterval> intervals = CreateRandomIntervals(10);
+ IntervalTree<IntTraits> t(intervals);
+
+ // Generate random queries.
+ vector<int> queries;
+ for (int i = 0; i < kNumQueries; i++) {
+ queries.push_back(rand() % 100);
+ }
+ std::sort(queries.begin(), queries.end());
+
+ vector<pair<string, int>> results_simple;
+ for (int q : queries) {
+ vector<IntInterval> intervals;
+ t.FindContainingPoint(q, &intervals);
+ for (const auto& interval : intervals) {
+ results_simple.emplace_back(interval.ToString(), q);
+ }
+ }
+
+ vector<pair<string, int>> results_batch;
+ t.ForEachIntervalContainingPoints(
+ queries,
+ [&](int query_point, const IntInterval& interval) {
+ results_batch.emplace_back(interval.ToString(), query_point);
+ });
+
+ // Check the property that, when the batch query points are in sorted order,
+ // the results are grouped by interval, and within each interval, sorted by
+ // query point. Each interval may have at most two groups.
+ boost::optional<pair<string, int>> prev = boost::none;
+ std::map<string, int> intervals_seen;
+ for (int i = 0; i < results_batch.size(); i++) {
+ const auto& cur = results_batch[i];
+ // If it's another query point hitting the same interval,
+ // make sure the query points are returned in order.
+ if (prev && prev->first == cur.first) {
+ EXPECT_GE(cur.second, prev->second) << prev->first;
+ } else {
+ // It's the start of a new interval's data. Make sure that we don't
+ // see the same interval twice.
+ EXPECT_LE(++intervals_seen[cur.first], 2)
+ << "Saw more than two groups for interval " << cur.first;
+ }
+ prev = cur;
+ }
+
+ std::sort(results_simple.begin(), results_simple.end());
+ std::sort(results_batch.begin(), results_batch.end());
+ ASSERT_EQ(results_simple, results_batch);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/interval_tree.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/interval_tree.h b/be/src/kudu/util/interval_tree.h
new file mode 100644
index 0000000..a677528
--- /dev/null
+++ b/be/src/kudu/util/interval_tree.h
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Implements an Interval Tree. See http://en.wikipedia.org/wiki/Interval_tree
+// or CLRS for a full description of the data structure.
+//
+// Callers of this class should also include interval_tree-inl.h for function
+// definitions.
+#ifndef KUDU_UTIL_INTERVAL_TREE_H
+#define KUDU_UTIL_INTERVAL_TREE_H
+
+#include <glog/logging.h>
+
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+
+namespace interval_tree_internal {
+template<class Traits>
+class ITNode;
+}
+
+// Implements an Interval Tree.
+//
+// An Interval Tree is a data structure which stores a set of intervals and supports
+// efficient searches to determine which intervals in that set overlap a query
+// point or interval. These operations are O(lg n + k) where 'n' is the number of
+// intervals in the tree and 'k' is the number of results returned for a given query.
+//
+// This particular implementation is a static tree -- intervals may not be added or
+// removed once the tree is instantiated.
+//
+// This class also assumes that all intervals are "closed" intervals -- the intervals
+// are inclusive of their start and end points.
+//
+// The Traits class should have the following members:
+// Traits::point_type
+// a typedef for what a "point" in the range is
+//
+// Traits::interval_type
+// a typedef for an interval
+//
+// static point_type get_left(const interval_type &)
+// static point_type get_right(const interval_type &)
+// accessors which fetch the left and right bound of the interval, respectively.
+//
+// static int compare(const point_type &a, const point_type &b)
+// return < 0 if a < b, 0 if a == b, > 0 if a > b
+//
+// See interval_tree-test.cc for an example Traits class for 'int' ranges.
+template<class Traits>
+class IntervalTree {
+ private:
+ // Import types from the traits class to make code more readable.
+ typedef typename Traits::interval_type interval_type;
+ typedef typename Traits::point_type point_type;
+
+ // And some convenience types.
+ typedef std::vector<interval_type> IntervalVector;
+ typedef interval_tree_internal::ITNode<Traits> node_type;
+
+ public:
+ // Construct an Interval Tree containing the given set of intervals.
+ explicit IntervalTree(const IntervalVector &intervals);
+
+ ~IntervalTree();
+
+ // Find all intervals in the tree which contain the query point.
+ // The resulting intervals are added to the 'results' vector.
+ // The vector is not cleared first.
+ //
+ // NOTE: 'QueryPointType' is usually point_type, but can be any other
+ // type for which there exists the appropriate Traits::Compare(...) method.
+ template<class QueryPointType>
+ void FindContainingPoint(const QueryPointType &query,
+ IntervalVector *results) const;
+
+ // For each of the query points in the STL container 'queries', find all
+ // intervals in the tree which may contain those points. Calls 'cb(point, interval)'
+ // for each such interval.
+ //
+ // The points in the query container must be comparable to 'point_type'
+ // using Traits::Compare().
+ //
+ // The implementation sequences the calls to 'cb' with the following guarantees:
+ // 1) all of the results corresponding to a given interval will be yielded in at
+ // most two "groups" of calls (i.e. sub-sequences of calls with the same interval).
+ // 2) within each "group" of calls, the query points will be in ascending order.
+ //
+ // For example, the callback sequence may be:
+ //
+ // cb(q1, interval_1) -
+ // cb(q2, interval_1) | first group of interval_1
+ // cb(q6, interval_1) |
+ // cb(q7, interval_1) -
+ //
+ // cb(q2, interval_2) -
+ // cb(q3, interval_2) | first group of interval_2
+ // cb(q4, interval_2) -
+ //
+ // cb(q3, interval_1) -
+ // cb(q4, interval_1) | second group of interval_1
+ // cb(q5, interval_1) -
+ //
+ // cb(q2, interval_3) -
+ // cb(q3, interval_3) | first group of interval_3
+ // cb(q4, interval_3) -
+ //
+ // cb(q5, interval_2) -
+ // cb(q6, interval_2) | second group of interval_2
+ // cb(q7, interval_2) -
+ //
+ // REQUIRES: The input points must be pre-sorted or else this will return invalid
+ // results.
+ template<class Callback, class QueryContainer>
+ void ForEachIntervalContainingPoints(const QueryContainer& queries,
+ const Callback& cb) const;
+
+ // Find all intervals in the tree which intersect the given interval.
+ // The resulting intervals are added to the 'results' vector.
+ // The vector is not cleared first.
+ void FindIntersectingInterval(const interval_type &query,
+ IntervalVector *results) const;
+ private:
+ static void Partition(const IntervalVector &in,
+ point_type *split_point,
+ IntervalVector *left,
+ IntervalVector *overlapping,
+ IntervalVector *right);
+
+ // Create a node containing the given intervals, recursively splitting down the tree.
+ static node_type *CreateNode(const IntervalVector &intervals);
+
+ node_type *root_;
+
+ DISALLOW_COPY_AND_ASSIGN(IntervalTree);
+};
+
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonreader-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonreader-test.cc b/be/src/kudu/util/jsonreader-test.cc
new file mode 100644
index 0000000..9f62c31
--- /dev/null
+++ b/be/src/kudu/util/jsonreader-test.cc
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/jsonreader.h"
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using rapidjson::Value;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+TEST(JsonReaderTest, Corrupt) {
+ JsonReader r("");
+ Status s = r.Init();
+ ASSERT_TRUE(s.IsCorruption());
+ ASSERT_STR_CONTAINS(
+ s.ToString(), "JSON text is corrupt: Text only contains white space(s)");
+}
+
+TEST(JsonReaderTest, Empty) {
+ JsonReader r("{}");
+ ASSERT_OK(r.Init());
+ JsonReader r2("[]");
+ ASSERT_OK(r2.Init());
+
+ // Not found.
+ ASSERT_TRUE(r.ExtractBool(r.root(), "foo", nullptr).IsNotFound());
+ ASSERT_TRUE(r.ExtractInt32(r.root(), "foo", nullptr).IsNotFound());
+ ASSERT_TRUE(r.ExtractInt64(r.root(), "foo", nullptr).IsNotFound());
+ ASSERT_TRUE(r.ExtractString(r.root(), "foo", nullptr).IsNotFound());
+ ASSERT_TRUE(r.ExtractObject(r.root(), "foo", nullptr).IsNotFound());
+ ASSERT_TRUE(r.ExtractObjectArray(r.root(), "foo", nullptr).IsNotFound());
+}
+
+TEST(JsonReaderTest, Basic) {
+ JsonReader r("{ \"foo\" : \"bar\" }");
+ ASSERT_OK(r.Init());
+ string foo;
+ ASSERT_OK(r.ExtractString(r.root(), "foo", &foo));
+ ASSERT_EQ("bar", foo);
+
+ // Bad types.
+ ASSERT_TRUE(r.ExtractBool(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt32(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt64(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObject(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObjectArray(r.root(), "foo", nullptr).IsInvalidArgument());
+}
+
+TEST(JsonReaderTest, LessBasic) {
+ string doc = Substitute(
+ "{ \"small\" : 1, \"big\" : $0, \"null\" : null, \"empty\" : \"\", \"bool\" : true }",
+ kint64max);
+ JsonReader r(doc);
+ ASSERT_OK(r.Init());
+ int32_t small;
+ ASSERT_OK(r.ExtractInt32(r.root(), "small", &small));
+ ASSERT_EQ(1, small);
+ int64_t big;
+ ASSERT_OK(r.ExtractInt64(r.root(), "big", &big));
+ ASSERT_EQ(kint64max, big);
+ string str;
+ ASSERT_OK(r.ExtractString(r.root(), "null", &str));
+ ASSERT_EQ("", str);
+ ASSERT_OK(r.ExtractString(r.root(), "empty", &str));
+ ASSERT_EQ("", str);
+ bool b;
+ ASSERT_OK(r.ExtractBool(r.root(), "bool", &b));
+ ASSERT_TRUE(b);
+
+ // Bad types.
+ ASSERT_TRUE(r.ExtractBool(r.root(), "small", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractString(r.root(), "small", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObject(r.root(), "small", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObjectArray(r.root(), "small", nullptr).IsInvalidArgument());
+
+ ASSERT_TRUE(r.ExtractBool(r.root(), "big", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt32(r.root(), "big", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractString(r.root(), "big", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObject(r.root(), "big", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObjectArray(r.root(), "big", nullptr).IsInvalidArgument());
+
+ ASSERT_TRUE(r.ExtractBool(r.root(), "null", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt32(r.root(), "null", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt64(r.root(), "null", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObject(r.root(), "null", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObjectArray(r.root(), "null", nullptr).IsInvalidArgument());
+
+ ASSERT_TRUE(r.ExtractBool(r.root(), "empty", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt32(r.root(), "empty", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt64(r.root(), "empty", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObject(r.root(), "empty", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObjectArray(r.root(), "empty", nullptr).IsInvalidArgument());
+
+ ASSERT_TRUE(r.ExtractInt32(r.root(), "bool", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt64(r.root(), "bool", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractString(r.root(), "bool", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObject(r.root(), "bool", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObjectArray(r.root(), "bool", nullptr).IsInvalidArgument());
+}
+
+TEST(JsonReaderTest, Objects) {
+ JsonReader r("{ \"foo\" : { \"1\" : 1 } }");
+ ASSERT_OK(r.Init());
+
+ const Value* foo = nullptr;
+ ASSERT_OK(r.ExtractObject(r.root(), "foo", &foo));
+ ASSERT_TRUE(foo);
+
+ int32_t one;
+ ASSERT_OK(r.ExtractInt32(foo, "1", &one));
+ ASSERT_EQ(1, one);
+
+ // Bad types.
+ ASSERT_TRUE(r.ExtractBool(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt32(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt64(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractString(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObjectArray(r.root(), "foo", nullptr).IsInvalidArgument());
+}
+
+TEST(JsonReaderTest, TopLevelArray) {
+ JsonReader r("[ { \"name\" : \"foo\" }, { \"name\" : \"bar\" } ]");
+ ASSERT_OK(r.Init());
+
+ vector<const Value*> objs;
+ ASSERT_OK(r.ExtractObjectArray(r.root(), nullptr, &objs));
+ ASSERT_EQ(2, objs.size());
+ string name;
+ ASSERT_OK(r.ExtractString(objs[0], "name", &name));
+ ASSERT_EQ("foo", name);
+ ASSERT_OK(r.ExtractString(objs[1], "name", &name));
+ ASSERT_EQ("bar", name);
+
+ // Bad types.
+ ASSERT_TRUE(r.ExtractBool(r.root(), nullptr, nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt32(r.root(), nullptr, nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt64(r.root(), nullptr, nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractString(r.root(), nullptr, nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObject(r.root(), nullptr, nullptr).IsInvalidArgument());
+}
+
+TEST(JsonReaderTest, NestedArray) {
+ JsonReader r("{ \"foo\" : [ { \"val\" : 0 }, { \"val\" : 1 }, { \"val\" : 2 } ] }");
+ ASSERT_OK(r.Init());
+
+ vector<const Value*> foo;
+ ASSERT_OK(r.ExtractObjectArray(r.root(), "foo", &foo));
+ ASSERT_EQ(3, foo.size());
+ int i = 0;
+ for (const Value* v : foo) {
+ int32_t number;
+ ASSERT_OK(r.ExtractInt32(v, "val", &number));
+ ASSERT_EQ(i, number);
+ i++;
+ }
+
+ // Bad types.
+ ASSERT_TRUE(r.ExtractBool(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt32(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractInt64(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractString(r.root(), "foo", nullptr).IsInvalidArgument());
+ ASSERT_TRUE(r.ExtractObject(r.root(), "foo", nullptr).IsInvalidArgument());
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonreader.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonreader.cc b/be/src/kudu/util/jsonreader.cc
new file mode 100644
index 0000000..acbc869
--- /dev/null
+++ b/be/src/kudu/util/jsonreader.cc
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/jsonreader.h"
+
+#include <utility>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using rapidjson::Value;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+JsonReader::JsonReader(string text) : text_(std::move(text)) {}
+
+JsonReader::~JsonReader() {
+}
+
+Status JsonReader::Init() {
+ document_.Parse<0>(text_.c_str());
+ if (document_.HasParseError()) {
+ return Status::Corruption("JSON text is corrupt", document_.GetParseError());
+ }
+ return Status::OK();
+}
+
+Status JsonReader::ExtractBool(const Value* object,
+ const char* field,
+ bool* result) const {
+ const Value* val;
+ RETURN_NOT_OK(ExtractField(object, field, &val));
+ if (PREDICT_FALSE(!val->IsBool())) {
+ return Status::InvalidArgument(Substitute(
+ "Wrong type during field extraction: expected bool but got $0",
+ val->GetType()));
+ }
+ *result = val->GetBool();
+ return Status::OK();
+}
+
+Status JsonReader::ExtractInt32(const Value* object,
+ const char* field,
+ int32_t* result) const {
+ const Value* val;
+ RETURN_NOT_OK(ExtractField(object, field, &val));
+ if (PREDICT_FALSE(!val->IsInt())) {
+ return Status::InvalidArgument(Substitute(
+ "Wrong type during field extraction: expected int32 but got $0",
+ val->GetType()));
+ }
+ *result = val->GetUint();
+ return Status::OK();
+}
+
+Status JsonReader::ExtractInt64(const Value* object,
+ const char* field,
+ int64_t* result) const {
+ const Value* val;
+ RETURN_NOT_OK(ExtractField(object, field, &val));
+ if (PREDICT_FALSE(!val->IsInt64())) {
+ return Status::InvalidArgument(Substitute(
+ "Wrong type during field extraction: expected int64 but got $0",
+ val->GetType())); }
+ *result = val->GetUint64();
+ return Status::OK();
+}
+
+Status JsonReader::ExtractString(const Value* object,
+ const char* field,
+ string* result) const {
+ const Value* val;
+ RETURN_NOT_OK(ExtractField(object, field, &val));
+ if (PREDICT_FALSE(!val->IsString())) {
+ if (val->IsNull()) {
+ *result = "";
+ return Status::OK();
+ }
+ return Status::InvalidArgument(Substitute(
+ "Wrong type during field extraction: expected string but got $0",
+ val->GetType())); }
+ result->assign(val->GetString());
+ return Status::OK();
+}
+
+Status JsonReader::ExtractObject(const Value* object,
+ const char* field,
+ const Value** result) const {
+ const Value* val;
+ RETURN_NOT_OK(ExtractField(object, field, &val));
+ if (PREDICT_FALSE(!val->IsObject())) {
+ return Status::InvalidArgument(Substitute(
+ "Wrong type during field extraction: expected object but got $0",
+ val->GetType())); }
+ *result = val;
+ return Status::OK();
+}
+
+Status JsonReader::ExtractObjectArray(const Value* object,
+ const char* field,
+ vector<const Value*>* result) const {
+ const Value* val;
+ RETURN_NOT_OK(ExtractField(object, field, &val));
+ if (PREDICT_FALSE(!val->IsArray())) {
+ return Status::InvalidArgument(Substitute(
+ "Wrong type during field extraction: expected object array but got $0",
+ val->GetType())); }
+ for (Value::ConstValueIterator iter = val->Begin(); iter != val->End(); ++iter) {
+ result->push_back(iter);
+ }
+ return Status::OK();
+}
+
+Status JsonReader::ExtractField(const Value* object,
+ const char* field,
+ const Value** result) const {
+ if (field && PREDICT_FALSE(!object->HasMember(field))) {
+ return Status::NotFound("Missing field", field);
+ }
+ *result = field ? &(*object)[field] : object;
+ return Status::OK();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonreader.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonreader.h b/be/src/kudu/util/jsonreader.h
new file mode 100644
index 0000000..e389b57
--- /dev/null
+++ b/be/src/kudu/util/jsonreader.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_JSONREADER_H_
+#define KUDU_UTIL_JSONREADER_H_
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Wraps the JSON parsing functionality of rapidjson::Document.
+//
+// Unlike JsonWriter, this class does not hide rapidjson internals from
+// clients. That's because there's just no easy way to implement object and
+// array parsing otherwise. At most, this class aspires to be a simpler
+// error-handling wrapper for reading and parsing.
+class JsonReader {
+ public:
+ explicit JsonReader(std::string text);
+ ~JsonReader();
+
+ Status Init();
+
+ // Extractor methods.
+ //
+ // If 'field' is not NULL, will look for a field with that name in the
+ // given object, returning Status::NotFound if it cannot be found. If
+ // 'field' is NULL, will try to convert 'object' directly into the
+ // desire type.
+
+ Status ExtractBool(const rapidjson::Value* object,
+ const char* field,
+ bool* result) const;
+
+ Status ExtractInt32(const rapidjson::Value* object,
+ const char* field,
+ int32_t* result) const;
+
+ Status ExtractInt64(const rapidjson::Value* object,
+ const char* field,
+ int64_t* result) const;
+
+ Status ExtractString(const rapidjson::Value* object,
+ const char* field,
+ std::string* result) const;
+
+ // 'result' is only valid for as long as JsonReader is alive.
+ Status ExtractObject(const rapidjson::Value* object,
+ const char* field,
+ const rapidjson::Value** result) const;
+
+ // 'result' is only valid for as long as JsonReader is alive.
+ Status ExtractObjectArray(const rapidjson::Value* object,
+ const char* field,
+ std::vector<const rapidjson::Value*>* result) const;
+
+ const rapidjson::Value* root() const { return &document_; }
+
+ private:
+ Status ExtractField(const rapidjson::Value* object,
+ const char* field,
+ const rapidjson::Value** result) const;
+
+ std::string text_;
+ rapidjson::Document document_;
+
+ DISALLOW_COPY_AND_ASSIGN(JsonReader);
+};
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_JSONREADER_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonwriter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonwriter-test.cc b/be/src/kudu/util/jsonwriter-test.cc
new file mode 100644
index 0000000..6f9b10d
--- /dev/null
+++ b/be/src/kudu/util/jsonwriter-test.cc
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <stdint.h>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/jsonwriter_test.pb.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+namespace google { namespace protobuf { class Message; } }
+
+using google::protobuf::Message;
+using jsonwriter_test::TestAllTypes;
+
+namespace kudu {
+
+class TestJsonWriter : public KuduTest {
+ protected:
+ void DoBenchmark(const Message& pb);
+
+ TestAllTypes MakeAllTypesPB() {
+ TestAllTypes pb;
+ pb.set_optional_int32(1);
+ pb.set_optional_int64(2);
+ pb.set_optional_uint32(3);
+ pb.set_optional_uint64(4);
+ pb.set_optional_sint32(5);
+ pb.set_optional_sint64(6);
+ pb.set_optional_fixed32(7);
+ pb.set_optional_fixed64(8);
+ pb.set_optional_sfixed32(9);
+ pb.set_optional_sfixed64(10);
+ pb.set_optional_float(11);
+ pb.set_optional_double(12);
+ pb.set_optional_bool(true);
+ pb.set_optional_string("hello world");
+ pb.set_optional_redacted_string("secret!");
+ pb.set_optional_nested_enum(TestAllTypes::FOO);
+ return pb;
+ }
+
+};
+
+TEST_F(TestJsonWriter, TestPBEmpty) {
+ TestAllTypes pb;
+ ASSERT_EQ("{}", JsonWriter::ToJson(pb, JsonWriter::PRETTY));
+}
+
+TEST_F(TestJsonWriter, TestPBAllFieldTypes) {
+ ASSERT_NE("", gflags::SetCommandLineOption("redact", "log"));
+ TestAllTypes pb = MakeAllTypesPB();
+
+ ASSERT_EQ("{\n"
+ " \"optional_int32\": 1,\n"
+ " \"optional_int64\": 2,\n"
+ " \"optional_uint32\": 3,\n"
+ " \"optional_uint64\": 4,\n"
+ " \"optional_sint32\": 5,\n"
+ " \"optional_sint64\": 6,\n"
+ " \"optional_fixed32\": 7,\n"
+ " \"optional_fixed64\": 8,\n"
+ " \"optional_sfixed32\": 9,\n"
+ " \"optional_sfixed64\": 10,\n"
+ " \"optional_float\": 11,\n"
+ " \"optional_double\": 12,\n"
+ " \"optional_bool\": true,\n"
+ " \"optional_string\": \"hello world\",\n"
+ " \"optional_redacted_string\": \"<redacted>\",\n"
+ " \"optional_nested_enum\": \"FOO\"\n"
+ "}", JsonWriter::ToJson(pb, JsonWriter::PRETTY));
+ ASSERT_EQ("{"
+ "\"optional_int32\":1,"
+ "\"optional_int64\":2,"
+ "\"optional_uint32\":3,"
+ "\"optional_uint64\":4,"
+ "\"optional_sint32\":5,"
+ "\"optional_sint64\":6,"
+ "\"optional_fixed32\":7,"
+ "\"optional_fixed64\":8,"
+ "\"optional_sfixed32\":9,"
+ "\"optional_sfixed64\":10,"
+ "\"optional_float\":11,"
+ "\"optional_double\":12,"
+ "\"optional_bool\":true,"
+ "\"optional_string\":\"hello world\","
+ "\"optional_redacted_string\":\"<redacted>\","
+ "\"optional_nested_enum\":\"FOO\""
+ "}", JsonWriter::ToJson(pb, JsonWriter::COMPACT));
+
+}
+
+TEST_F(TestJsonWriter, TestPBRepeatedPrimitives) {
+ ASSERT_NE("", gflags::SetCommandLineOption("redact", "log"));
+ TestAllTypes pb;
+ for (int i = 0; i <= 3; i++) {
+ pb.add_repeated_int32(i);
+ pb.add_repeated_string(strings::Substitute("hi $0", i));
+ pb.add_repeated_redacted_string("secret!");
+ pb.add_repeated_redacted_bytes("secret!");
+ }
+ ASSERT_EQ("{\n"
+ " \"repeated_int32\": [\n"
+ " 0,\n"
+ " 1,\n"
+ " 2,\n"
+ " 3\n"
+ " ],\n"
+ " \"repeated_string\": [\n"
+ " \"hi 0\",\n"
+ " \"hi 1\",\n"
+ " \"hi 2\",\n"
+ " \"hi 3\"\n"
+ " ],\n"
+ " \"repeated_redacted_string\": [\n"
+ " \"<redacted>\",\n"
+ " \"<redacted>\",\n"
+ " \"<redacted>\",\n"
+ " \"<redacted>\"\n"
+ " ],\n"
+ " \"repeated_redacted_bytes\": [\n"
+ " \"<redacted>\",\n"
+ " \"<redacted>\",\n"
+ " \"<redacted>\",\n"
+ " \"<redacted>\"\n"
+ " ]\n"
+ "}", JsonWriter::ToJson(pb, JsonWriter::PRETTY));
+ ASSERT_EQ("{\"repeated_int32\":[0,1,2,3],"
+ "\"repeated_string\":[\"hi 0\",\"hi 1\",\"hi 2\",\"hi 3\"],"
+ "\"repeated_redacted_string\":[\"<redacted>\",\"<redacted>\","
+ "\"<redacted>\",\"<redacted>\"],"
+ "\"repeated_redacted_bytes\":[\"<redacted>\",\"<redacted>\","
+ "\"<redacted>\",\"<redacted>\"]}",
+ JsonWriter::ToJson(pb, JsonWriter::COMPACT));
+}
+
+TEST_F(TestJsonWriter, TestPBNestedMessage) {
+ TestAllTypes pb;
+ pb.add_repeated_nested_message()->set_int_field(12345);
+ pb.mutable_optional_nested_message()->set_int_field(54321);
+ ASSERT_EQ("{\n"
+ " \"optional_nested_message\": {\n"
+ " \"int_field\": 54321\n"
+ " },\n"
+ " \"repeated_nested_message\": [\n"
+ " {\n"
+ " \"int_field\": 12345\n"
+ " }\n"
+ " ]\n"
+ "}", JsonWriter::ToJson(pb, JsonWriter::PRETTY));
+ ASSERT_EQ("{\"optional_nested_message\":{\"int_field\":54321},"
+ "\"repeated_nested_message\":"
+ "[{\"int_field\":12345}]}",
+ JsonWriter::ToJson(pb, JsonWriter::COMPACT));
+}
+
+void TestJsonWriter::DoBenchmark(const Message& pb) {
+ int64_t total_len = 0;
+ Stopwatch sw;
+ sw.start();
+ while (sw.elapsed().wall_seconds() < 5) {
+ std::ostringstream str;
+ JsonWriter jw(&str, JsonWriter::COMPACT);
+ jw.StartArray();
+ for (int i = 0; i < 10000; i++) {
+ jw.Protobuf(pb);
+ }
+ jw.EndArray();
+ total_len += str.str().size();
+ }
+ sw.stop();
+ double mbps = total_len / 1024.0 / 1024.0 / sw.elapsed().user_cpu_seconds();
+ LOG(INFO) << "Throughput: " << mbps << "MB/sec";
+}
+
+TEST_F(TestJsonWriter, BenchmarkAllTypes) {
+ DoBenchmark(MakeAllTypesPB());
+}
+
+TEST_F(TestJsonWriter, BenchmarkNestedMessage) {
+ TestAllTypes pb;
+ pb.add_repeated_nested_message()->set_int_field(12345);
+ pb.mutable_optional_nested_message()->set_int_field(54321);
+ DoBenchmark(pb);
+}
+
+TEST_F(TestJsonWriter, BenchmarkRepeatedInt64) {
+ TestAllTypes pb;
+ for (int i = 0; i < 10000; i++) {
+ pb.add_repeated_int64(i);
+ }
+ DoBenchmark(pb);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonwriter.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonwriter.cc b/be/src/kudu/util/jsonwriter.cc
new file mode 100644
index 0000000..3a5580c
--- /dev/null
+++ b/be/src/kudu/util/jsonwriter.cc
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/jsonwriter.h"
+
+#include <new>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/message.h>
+#include <rapidjson/writer.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/rapidjson.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/pb_util.pb.h"
+
+using google::protobuf::FieldDescriptor;
+using google::protobuf::Message;
+using google::protobuf::Reflection;
+
+using std::ostringstream;
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+// Adapter to allow RapidJSON to write directly to a stringstream.
+// Since Squeasel exposes a stringstream as its interface, this is needed to avoid overcopying.
+class UTF8StringStreamBuffer {
+ public:
+ explicit UTF8StringStreamBuffer(std::ostringstream* out);
+ ~UTF8StringStreamBuffer();
+ void Put(rapidjson::UTF8<>::Ch c);
+
+ void Flush();
+
+ private:
+ faststring buf_;
+ std::ostringstream* out_;
+};
+
+// rapidjson doesn't provide any common interface between the PrettyWriter and
+// Writer classes. So, we create our own pure virtual interface here, and then
+// use JsonWriterImpl<T> below to make the two different rapidjson implementations
+// correspond to this subclass.
+class JsonWriterIf {
+ public:
+ virtual void Null() = 0;
+ virtual void Bool(bool b) = 0;
+ virtual void Int(int i) = 0;
+ virtual void Uint(unsigned u) = 0;
+ virtual void Int64(int64_t i64) = 0;
+ virtual void Uint64(uint64_t u64) = 0;
+ virtual void Double(double d) = 0;
+ virtual void String(const char* str, size_t length) = 0;
+ virtual void String(const char* str) = 0;
+ virtual void String(const std::string& str) = 0;
+
+ virtual void StartObject() = 0;
+ virtual void EndObject() = 0;
+ virtual void StartArray() = 0;
+ virtual void EndArray() = 0;
+
+ virtual ~JsonWriterIf() {}
+};
+
+// Adapts the different rapidjson Writer implementations to our virtual
+// interface above.
+template<class T>
+class JsonWriterImpl : public JsonWriterIf {
+ public:
+ explicit JsonWriterImpl(ostringstream* out);
+
+ virtual void Null() OVERRIDE;
+ virtual void Bool(bool b) OVERRIDE;
+ virtual void Int(int i) OVERRIDE;
+ virtual void Uint(unsigned u) OVERRIDE;
+ virtual void Int64(int64_t i64) OVERRIDE;
+ virtual void Uint64(uint64_t u64) OVERRIDE;
+ virtual void Double(double d) OVERRIDE;
+ virtual void String(const char* str, size_t length) OVERRIDE;
+ virtual void String(const char* str) OVERRIDE;
+ virtual void String(const std::string& str) OVERRIDE;
+
+ virtual void StartObject() OVERRIDE;
+ virtual void EndObject() OVERRIDE;
+ virtual void StartArray() OVERRIDE;
+ virtual void EndArray() OVERRIDE;
+
+ private:
+ UTF8StringStreamBuffer stream_;
+ T writer_;
+ DISALLOW_COPY_AND_ASSIGN(JsonWriterImpl);
+};
+
+//
+// JsonWriter
+//
+
+typedef rapidjson::PrettyWriter<UTF8StringStreamBuffer> PrettyWriterClass;
+typedef rapidjson::Writer<UTF8StringStreamBuffer> CompactWriterClass;
+
+JsonWriter::JsonWriter(ostringstream* out, Mode m) {
+ switch (m) {
+ case PRETTY:
+ impl_.reset(new JsonWriterImpl<PrettyWriterClass>(DCHECK_NOTNULL(out)));
+ break;
+ case COMPACT:
+ impl_.reset(new JsonWriterImpl<CompactWriterClass>(DCHECK_NOTNULL(out)));
+ break;
+ }
+}
+JsonWriter::~JsonWriter() {
+}
+
+void JsonWriter::Null() { impl_->Null(); }
+void JsonWriter::Bool(bool b) { impl_->Bool(b); }
+void JsonWriter::Int(int i) { impl_->Int(i); }
+void JsonWriter::Uint(unsigned u) { impl_->Uint(u); }
+void JsonWriter::Int64(int64_t i64) { impl_->Int64(i64); }
+void JsonWriter::Uint64(uint64_t u64) { impl_->Uint64(u64); }
+void JsonWriter::Double(double d) { impl_->Double(d); }
+void JsonWriter::String(const char* str, size_t length) { impl_->String(str, length); }
+void JsonWriter::String(const char* str) { impl_->String(str); }
+void JsonWriter::String(const string& str) { impl_->String(str); }
+void JsonWriter::StartObject() { impl_->StartObject(); }
+void JsonWriter::EndObject() { impl_->EndObject(); }
+void JsonWriter::StartArray() { impl_->StartArray(); }
+void JsonWriter::EndArray() { impl_->EndArray(); }
+
+// Specializations for common primitive metric types.
+template<> void JsonWriter::Value(const bool& val) {
+ Bool(val);
+}
+template<> void JsonWriter::Value(const int32_t& val) {
+ Int(val);
+}
+template<> void JsonWriter::Value(const uint32_t& val) {
+ Uint(val);
+}
+template<> void JsonWriter::Value(const int64_t& val) {
+ Int64(val);
+}
+template<> void JsonWriter::Value(const uint64_t& val) {
+ Uint64(val);
+}
+template<> void JsonWriter::Value(const double& val) {
+ Double(val);
+}
+template<> void JsonWriter::Value(const string& val) {
+ String(val);
+}
+
+#if defined(__APPLE__)
+template<> void JsonWriter::Value(const size_t& val) {
+ Uint64(val);
+}
+#endif
+
+void JsonWriter::Protobuf(const Message& pb) {
+ const Reflection* reflection = pb.GetReflection();
+ vector<const FieldDescriptor*> fields;
+ reflection->ListFields(pb, &fields);
+
+ StartObject();
+ for (const FieldDescriptor* field : fields) {
+ String(field->name());
+ if (field->is_repeated()) {
+ StartArray();
+ int size = reflection->FieldSize(pb, field);
+ for (int i = 0; i < size; i++) {
+ ProtobufRepeatedField(pb, reflection, field, i);
+ }
+ EndArray();
+ } else {
+ ProtobufField(pb, reflection, field);
+ }
+ }
+ EndObject();
+}
+
+void JsonWriter::ProtobufField(const Message& pb, const Reflection* reflection,
+ const FieldDescriptor* field) {
+ switch (field->cpp_type()) {
+ case FieldDescriptor::CPPTYPE_INT32:
+ Int(reflection->GetInt32(pb, field));
+ break;
+ case FieldDescriptor::CPPTYPE_INT64:
+ Int64(reflection->GetInt64(pb, field));
+ break;
+ case FieldDescriptor::CPPTYPE_UINT32:
+ Uint(reflection->GetUInt32(pb, field));
+ break;
+ case FieldDescriptor::CPPTYPE_UINT64:
+ Uint64(reflection->GetUInt64(pb, field));
+ break;
+ case FieldDescriptor::CPPTYPE_DOUBLE:
+ Double(reflection->GetDouble(pb, field));
+ break;
+ case FieldDescriptor::CPPTYPE_FLOAT:
+ Double(reflection->GetFloat(pb, field));
+ break;
+ case FieldDescriptor::CPPTYPE_BOOL:
+ Bool(reflection->GetBool(pb, field));
+ break;
+ case FieldDescriptor::CPPTYPE_ENUM:
+ String(reflection->GetEnum(pb, field)->name());
+ break;
+ case FieldDescriptor::CPPTYPE_STRING:
+ String(KUDU_MAYBE_REDACT_IF(field->options().GetExtension(REDACT),
+ reflection->GetString(pb, field)));
+ break;
+ case FieldDescriptor::CPPTYPE_MESSAGE:
+ Protobuf(reflection->GetMessage(pb, field));
+ break;
+ default:
+ LOG(FATAL) << "Unknown cpp_type: " << field->cpp_type();
+ }
+}
+
+void JsonWriter::ProtobufRepeatedField(const Message& pb, const Reflection* reflection,
+ const FieldDescriptor* field, int index) {
+ switch (field->cpp_type()) {
+ case FieldDescriptor::CPPTYPE_INT32:
+ Int(reflection->GetRepeatedInt32(pb, field, index));
+ break;
+ case FieldDescriptor::CPPTYPE_INT64:
+ Int64(reflection->GetRepeatedInt64(pb, field, index));
+ break;
+ case FieldDescriptor::CPPTYPE_UINT32:
+ Uint(reflection->GetRepeatedUInt32(pb, field, index));
+ break;
+ case FieldDescriptor::CPPTYPE_UINT64:
+ Uint64(reflection->GetRepeatedUInt64(pb, field, index));
+ break;
+ case FieldDescriptor::CPPTYPE_DOUBLE:
+ Double(reflection->GetRepeatedDouble(pb, field, index));
+ break;
+ case FieldDescriptor::CPPTYPE_FLOAT:
+ Double(reflection->GetRepeatedFloat(pb, field, index));
+ break;
+ case FieldDescriptor::CPPTYPE_BOOL:
+ Bool(reflection->GetRepeatedBool(pb, field, index));
+ break;
+ case FieldDescriptor::CPPTYPE_ENUM:
+ String(reflection->GetRepeatedEnum(pb, field, index)->name());
+ break;
+ case FieldDescriptor::CPPTYPE_STRING:
+ String(KUDU_MAYBE_REDACT_IF(field->options().GetExtension(REDACT),
+ reflection->GetRepeatedString(pb, field, index)));
+ break;
+ case FieldDescriptor::CPPTYPE_MESSAGE:
+ Protobuf(reflection->GetRepeatedMessage(pb, field, index));
+ break;
+ default:
+ LOG(FATAL) << "Unknown cpp_type: " << field->cpp_type();
+ }
+}
+
+string JsonWriter::ToJson(const Message& pb, Mode mode) {
+ ostringstream stream;
+ JsonWriter writer(&stream, mode);
+ writer.Protobuf(pb);
+ return stream.str();
+}
+
+//
+// UTF8StringStreamBuffer
+//
+
+UTF8StringStreamBuffer::UTF8StringStreamBuffer(std::ostringstream* out)
+ : out_(DCHECK_NOTNULL(out)) {
+}
+UTF8StringStreamBuffer::~UTF8StringStreamBuffer() {
+ DCHECK_EQ(buf_.size(), 0) << "Forgot to flush!";
+}
+
+void UTF8StringStreamBuffer::Put(rapidjson::UTF8<>::Ch c) {
+ buf_.push_back(c);
+}
+
+void UTF8StringStreamBuffer::Flush() {
+ out_->write(reinterpret_cast<char*>(buf_.data()), buf_.size());
+ buf_.clear();
+}
+
+//
+// JsonWriterImpl: simply forward to the underlying implementation.
+//
+
+template<class T>
+JsonWriterImpl<T>::JsonWriterImpl(ostringstream* out)
+ : stream_(DCHECK_NOTNULL(out)),
+ writer_(stream_) {
+}
+template<class T>
+void JsonWriterImpl<T>::Null() { writer_.Null(); }
+template<class T>
+void JsonWriterImpl<T>::Bool(bool b) { writer_.Bool(b); }
+template<class T>
+void JsonWriterImpl<T>::Int(int i) { writer_.Int(i); }
+template<class T>
+void JsonWriterImpl<T>::Uint(unsigned u) { writer_.Uint(u); }
+template<class T>
+void JsonWriterImpl<T>::Int64(int64_t i64) { writer_.Int64(i64); }
+template<class T>
+void JsonWriterImpl<T>::Uint64(uint64_t u64) { writer_.Uint64(u64); }
+template<class T>
+void JsonWriterImpl<T>::Double(double d) { writer_.Double(d); }
+template<class T>
+void JsonWriterImpl<T>::String(const char* str, size_t length) { writer_.String(str, length); }
+template<class T>
+void JsonWriterImpl<T>::String(const char* str) { writer_.String(str); }
+template<class T>
+void JsonWriterImpl<T>::String(const string& str) { writer_.String(str.c_str(), str.length()); }
+template<class T>
+void JsonWriterImpl<T>::StartObject() { writer_.StartObject(); }
+template<class T>
+void JsonWriterImpl<T>::EndObject() {
+ writer_.EndObject();
+ stream_.Flush();
+}
+template<class T>
+void JsonWriterImpl<T>::StartArray() { writer_.StartArray(); }
+template<class T>
+void JsonWriterImpl<T>::EndArray() {
+ writer_.EndArray();
+ stream_.Flush();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonwriter.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonwriter.h b/be/src/kudu/util/jsonwriter.h
new file mode 100644
index 0000000..24b4575
--- /dev/null
+++ b/be/src/kudu/util/jsonwriter.h
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_JSONWRITER_H
+#define KUDU_UTIL_JSONWRITER_H
+
+#include <cstddef>
+#include <cstdint>
+#include <iosfwd>
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+
+namespace google {
+namespace protobuf {
+class FieldDescriptor;
+class Message;
+class Reflection;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class JsonWriterIf;
+
+// Acts as a pimpl for rapidjson so that not all metrics users must bring in the
+// rapidjson library, which is template-based and therefore hard to forward-declare.
+//
+// This class implements all the methods of rapidjson::JsonWriter, plus an
+// additional convenience method for String(std::string).
+//
+// We take an instance of std::stringstream in the constructor because Mongoose / Squeasel
+// uses std::stringstream for output buffering.
+class JsonWriter {
+ public:
+ enum Mode {
+ // Pretty-print the JSON, with nice indentation, newlines, etc.
+ PRETTY,
+ // Print the JSON as compactly as possible.
+ COMPACT
+ };
+
+ JsonWriter(std::ostringstream* out, Mode mode);
+ ~JsonWriter();
+
+ void Null();
+ void Bool(bool b);
+ void Int(int i);
+ void Uint(unsigned u);
+ void Int64(int64_t i64);
+ void Uint64(uint64_t u64);
+ void Double(double d);
+ void String(const char* str, size_t length);
+ void String(const char* str);
+ void String(const std::string& str);
+
+ // Convert the given protobuf message to JSON.
+ // The output respects redaction for 'string' and 'bytes' fields.
+ void Protobuf(const google::protobuf::Message& message);
+
+ template<typename T>
+ void Value(const T& val);
+
+ void StartObject();
+ void EndObject();
+ void StartArray();
+ void EndArray();
+
+ // Convert the given protobuf to JSON format.
+ static std::string ToJson(const google::protobuf::Message& pb,
+ Mode mode);
+
+ private:
+ void ProtobufField(const google::protobuf::Message& pb,
+ const google::protobuf::Reflection* reflection,
+ const google::protobuf::FieldDescriptor* field);
+ void ProtobufRepeatedField(const google::protobuf::Message& pb,
+ const google::protobuf::Reflection* reflection,
+ const google::protobuf::FieldDescriptor* field,
+ int index);
+
+ std::unique_ptr<JsonWriterIf> impl_;
+ DISALLOW_COPY_AND_ASSIGN(JsonWriter);
+};
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_JSONWRITER_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonwriter_test.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonwriter_test.proto b/be/src/kudu/util/jsonwriter_test.proto
new file mode 100644
index 0000000..b6f0300
--- /dev/null
+++ b/be/src/kudu/util/jsonwriter_test.proto
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package jsonwriter_test;
+
+import "kudu/util/pb_util.proto";
+
+// This proto includes every type of field in both singular and repeated
+// forms. This is mostly copied from 'unittest.proto' in the protobuf source
+// (hence the odd field numbers which skip some).
+message TestAllTypes {
+ message NestedMessage {
+ optional int32 int_field = 1;
+ }
+
+ enum NestedEnum {
+ FOO = 1;
+ BAR = 2;
+ BAZ = 3;
+ }
+
+ // Singular
+ optional int32 optional_int32 = 1;
+ optional int64 optional_int64 = 2;
+ optional uint32 optional_uint32 = 3;
+ optional uint64 optional_uint64 = 4;
+ optional sint32 optional_sint32 = 5;
+ optional sint64 optional_sint64 = 6;
+ optional fixed32 optional_fixed32 = 7;
+ optional fixed64 optional_fixed64 = 8;
+ optional sfixed32 optional_sfixed32 = 9;
+ optional sfixed64 optional_sfixed64 = 10;
+ optional float optional_float = 11;
+ optional double optional_double = 12;
+ optional bool optional_bool = 13;
+ optional string optional_string = 14;
+ optional string optional_redacted_string = 15 [ (kudu.REDACT) = true ];
+ optional bytes optional_bytes = 16;
+ optional bytes optional_redacted_bytes = 17 [ (kudu.REDACT) = true ];
+
+ optional NestedMessage optional_nested_message = 18;
+ optional NestedEnum optional_nested_enum = 21;
+
+ // Repeated
+ repeated int32 repeated_int32 = 31;
+ repeated int64 repeated_int64 = 32;
+ repeated uint32 repeated_uint32 = 33;
+ repeated uint64 repeated_uint64 = 34;
+ repeated sint32 repeated_sint32 = 35;
+ repeated sint64 repeated_sint64 = 36;
+ repeated fixed32 repeated_fixed32 = 37;
+ repeated fixed64 repeated_fixed64 = 38;
+ repeated sfixed32 repeated_sfixed32 = 39;
+ repeated sfixed64 repeated_sfixed64 = 40;
+ repeated float repeated_float = 41;
+ repeated double repeated_double = 42;
+ repeated bool repeated_bool = 43;
+ repeated string repeated_string = 44;
+ repeated bytes repeated_bytes = 45;
+ repeated string repeated_redacted_string = 46 [ (kudu.REDACT) = true ];
+ repeated string repeated_redacted_bytes = 47 [ (kudu.REDACT) = true ];
+
+ repeated NestedMessage repeated_nested_message = 48;
+ repeated NestedEnum repeated_nested_enum = 51;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/kernel_stack_watchdog.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/kernel_stack_watchdog.cc b/be/src/kudu/util/kernel_stack_watchdog.cc
new file mode 100644
index 0000000..27a259c
--- /dev/null
+++ b/be/src/kudu/util/kernel_stack_watchdog.cc
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/kernel_stack_watchdog.h"
+
+#include <cstdint>
+#include <cstring>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/os-util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(hung_task_check_interval_ms, 200,
+ "Number of milliseconds in between checks for hung threads");
+TAG_FLAG(hung_task_check_interval_ms, hidden);
+
+DEFINE_int32(inject_latency_on_kernel_stack_lookup_ms, 0,
+ "Number of milliseconds of latency to inject when reading a thread's "
+ "kernel stack");
+TAG_FLAG(inject_latency_on_kernel_stack_lookup_ms, hidden);
+
+using std::lock_guard;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+__thread KernelStackWatchdog::TLS* KernelStackWatchdog::tls_;
+
+KernelStackWatchdog::KernelStackWatchdog()
+ : log_collector_(nullptr),
+ finish_(1) {
+
+ // During creation of the stack watchdog thread, we need to disable using
+ // the stack watchdog itself. Otherwise, the 'StartThread' function will
+ // try to call back into initializing the stack watchdog, and will self-deadlock.
+ CHECK_OK(Thread::CreateWithFlags(
+ "kernel-watchdog", "kernel-watcher",
+ boost::bind(&KernelStackWatchdog::RunThread, this),
+ Thread::NO_STACK_WATCHDOG,
+ &thread_));
+}
+
+KernelStackWatchdog::~KernelStackWatchdog() {
+ finish_.CountDown();
+ CHECK_OK(ThreadJoiner(thread_.get()).Join());
+}
+
+void KernelStackWatchdog::SaveLogsForTests(bool save_logs) {
+ lock_guard<simple_spinlock> l(log_lock_);
+ if (save_logs) {
+ log_collector_.reset(new std::vector<string>());
+ } else {
+ log_collector_.reset();
+ }
+}
+
+std::vector<string> KernelStackWatchdog::LoggedMessagesForTests() const {
+ lock_guard<simple_spinlock> l(log_lock_);
+ CHECK(log_collector_) << "Must call SaveLogsForTests(true) first";
+ return *log_collector_;
+}
+
+void KernelStackWatchdog::Register(TLS* tls) {
+ int64_t tid = Thread::CurrentThreadId();
+ lock_guard<simple_spinlock> l(tls_lock_);
+ InsertOrDie(&tls_by_tid_, tid, tls);
+}
+
+void KernelStackWatchdog::Unregister() {
+ int64_t tid = Thread::CurrentThreadId();
+
+ std::unique_ptr<TLS> tls(tls_);
+ {
+ std::unique_lock<Mutex> l(unregister_lock_, std::try_to_lock);
+ lock_guard<simple_spinlock> l2(tls_lock_);
+ CHECK(tls_by_tid_.erase(tid));
+ if (!l.owns_lock()) {
+ // The watchdog is in the middle of running and might be accessing
+ // 'tls', so just enqueue it for later deletion. Otherwise it
+ // will go out of scope at the end of this function and get
+ // deleted here.
+ pending_delete_.emplace_back(std::move(tls));
+ }
+ }
+ tls_ = nullptr;
+}
+
+Status GetKernelStack(pid_t p, string* ret) {
+ MAYBE_INJECT_FIXED_LATENCY(FLAGS_inject_latency_on_kernel_stack_lookup_ms);
+ faststring buf;
+ RETURN_NOT_OK(ReadFileToString(Env::Default(), Substitute("/proc/$0/stack", p), &buf));
+ *ret = buf.ToString();
+ return Status::OK();
+}
+
+void KernelStackWatchdog::RunThread() {
+ while (true) {
+ MonoDelta delta = MonoDelta::FromMilliseconds(FLAGS_hung_task_check_interval_ms);
+ if (finish_.WaitFor(delta)) {
+ // Watchdog exiting.
+ break;
+ }
+
+ // Don't send signals while the debugger is running, since it makes it hard to
+ // use.
+ if (IsBeingDebugged()) {
+ continue;
+ }
+
+ // Prevent threads from deleting their TLS objects between the snapshot loop and the sending of
+ // signals. This makes it safe for us to access their TLS.
+ //
+ // NOTE: it's still possible that the thread will have exited in between grabbing its pointer
+ // and sending a signal, but DumpThreadStack() already is safe about not sending a signal
+ // to some other non-Kudu thread.
+ MutexLock l(unregister_lock_);
+
+ // Take the snapshot of the thread information under a short lock.
+ //
+ // 'tls_lock_' prevents new threads from starting, so we don't want to do any lengthy work
+ // (such as gathering stack traces) under this lock.
+ TLSMap tls_map_copy;
+ vector<unique_ptr<TLS>> to_delete;
+ {
+ lock_guard<simple_spinlock> l(tls_lock_);
+ to_delete.swap(pending_delete_);
+ tls_map_copy = tls_by_tid_;
+ }
+ // Actually delete the no-longer-used TLS entries outside of the lock.
+ to_delete.clear();
+
+ MicrosecondsInt64 now = GetMonoTimeMicros();
+ for (const auto& entry : tls_map_copy) {
+ pid_t p = entry.first;
+ TLS::Data* tls = &entry.second->data_;
+ TLS::Data tls_copy;
+ tls->SnapshotCopy(&tls_copy);
+ for (int i = 0; i < tls_copy.depth_; i++) {
+ const TLS::Frame* frame = &tls_copy.frames_[i];
+
+ int paused_ms = (now - frame->start_time_) / 1000;
+ if (paused_ms > frame->threshold_ms_) {
+ string kernel_stack;
+ Status s = GetKernelStack(p, &kernel_stack);
+ if (!s.ok()) {
+ // Can't read the kernel stack of the pid, just ignore it.
+ kernel_stack = "(could not read kernel stack)";
+ }
+
+ string user_stack = DumpThreadStack(p);
+
+ // If the thread exited the frame we're looking at in between when we started
+ // grabbing the stack and now, then our stack isn't correct. We shouldn't log it.
+ //
+ // We just use unprotected reads here since this is a somewhat best-effort
+ // check.
+ if (ANNOTATE_UNPROTECTED_READ(tls->depth_) < tls_copy.depth_ ||
+ ANNOTATE_UNPROTECTED_READ(tls->frames_[i].start_time_) != frame->start_time_) {
+ break;
+ }
+
+ lock_guard<simple_spinlock> l(log_lock_);
+ LOG_STRING(WARNING, log_collector_.get())
+ << "Thread " << p << " stuck at " << frame->status_
+ << " for " << paused_ms << "ms" << ":\n"
+ << "Kernel stack:\n" << kernel_stack << "\n"
+ << "User stack:\n" << user_stack;
+ }
+ }
+ }
+ }
+}
+
+void KernelStackWatchdog::ThreadExiting(void* /* unused */) {
+ KernelStackWatchdog::GetInstance()->Unregister();
+}
+
+void KernelStackWatchdog::CreateAndRegisterTLS() {
+ DCHECK(!tls_);
+ // Disable leak check. LSAN sometimes gets false positives on thread locals.
+ // See: https://github.com/google/sanitizers/issues/757
+ debug::ScopedLeakCheckDisabler d;
+ auto* tls = new TLS();
+ KernelStackWatchdog::GetInstance()->Register(tls);
+ tls_ = tls;
+ kudu::threadlocal::internal::AddDestructor(&ThreadExiting, nullptr);
+}
+
+KernelStackWatchdog::TLS::TLS() {
+ memset(&data_, 0, sizeof(data_));
+}
+
+KernelStackWatchdog::TLS::~TLS() {
+}
+
+// Optimistic concurrency control approach to snapshot the value of another
+// thread's TLS, even though that thread might be changing it.
+//
+// Called by the watchdog thread to see if a target thread is currently in the
+// middle of a watched section.
+void KernelStackWatchdog::TLS::Data::SnapshotCopy(Data* copy) const {
+ while (true) {
+ Atomic32 v_0 = base::subtle::Acquire_Load(&seq_lock_);
+ if (v_0 & 1) {
+ // If the value is odd, then the thread is in the middle of modifying
+ // its TLS, and we have to spin.
+ base::subtle::PauseCPU();
+ continue;
+ }
+ ANNOTATE_IGNORE_READS_BEGIN();
+ memcpy(copy, this, sizeof(*copy));
+ ANNOTATE_IGNORE_READS_END();
+ Atomic32 v_1 = base::subtle::Release_Load(&seq_lock_);
+
+ // If the value hasn't changed since we started the copy, then
+ // we know that the copy was a consistent snapshot.
+ if (v_1 == v_0) break;
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/kernel_stack_watchdog.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/kernel_stack_watchdog.h b/be/src/kudu/util/kernel_stack_watchdog.h
new file mode 100644
index 0000000..6ec7b50
--- /dev/null
+++ b/be/src/kudu/util/kernel_stack_watchdog.h
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This class defines a singleton thread which manages a map of other thread IDs to
+// watch. Before performing some operation which may stall (eg IO) or which we expect
+// should be short (e.g. a callback on a critical thread that should not block), threads
+// may mark themselves as "watched", with a threshold beyond which they would like
+// warnings to be emitted including their stack trace at that time.
+//
+// In the background, a separate watchdog thread periodically wakes up, and if a thread
+// has been marked longer than its provided threshold, it will dump the stack trace
+// of that thread (both kernel-mode and user-mode stacks).
+//
+// This can be useful for diagnosing I/O stalls coming from the kernel, for example.
+//
+// Users will typically use the macro SCOPED_WATCH_STACK. Example usage:
+//
+// // We expect the Write() to return in <100ms. If it takes longer than that
+// // we'll see warnings indicating why it is stalled.
+// {
+// SCOPED_WATCH_STACK(100);
+// file->Write(...);
+// }
+//
+// If the Write call takes too long, a stack trace will be logged at WARNING level.
+// Note that the threshold time parameter is not a guarantee that a stall will be
+// caught by the watchdog thread. The watchdog only wakes up periodically to look
+// for threads that have been stalled too long. For example, if the threshold is 10ms
+// and the thread blocks for only 20ms, it's quite likely that the watchdog will
+// have missed the event.
+//
+// The SCOPED_WATCH_STACK macro is designed to have minimal overhead: approximately
+// equivalent to a clock_gettime() and a single 'mfence' instruction. Micro-benchmarks
+// measure the cost at about 50ns per call. Thus, it may safely be used in hot code
+// paths.
+//
+// Scopes with SCOPED_WATCH_STACK may be nested, but only up to a hard-coded limited depth
+// (currently 8).
+#ifndef KUDU_UTIL_KERNEL_STACK_WATCHDOG_H
+#define KUDU_UTIL_KERNEL_STACK_WATCHDOG_H
+
+#include <ctime>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/singleton.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/threadlocal.h"
+
+#define SCOPED_WATCH_STACK(threshold_ms) \
+ ScopedWatchKernelStack _stack_watcher(__FILE__ ":" AS_STRING(__LINE__), threshold_ms)
+
+namespace kudu {
+
+class Thread;
+
+// Singleton thread which implements the watchdog.
+class KernelStackWatchdog {
+ public:
+ static KernelStackWatchdog* GetInstance() {
+ return Singleton<KernelStackWatchdog>::get();
+ }
+
+ // Instead of logging through glog, log warning messages into a vector.
+ //
+ // If 'save_logs' is true, will start saving to the vector, and forget any
+ // previously logged messages.
+ // If 'save_logs' is false, disables this functionality.
+ void SaveLogsForTests(bool save_logs);
+
+ // Return any log messages saved since the last call to SaveLogsForTests(true).
+ std::vector<std::string> LoggedMessagesForTests() const;
+
+ private:
+ friend class Singleton<KernelStackWatchdog>;
+ friend class ScopedWatchKernelStack;
+
+ // The thread-local state which captures whether a thread should be watched by
+ // the watchdog. This structure is constructed as a thread-local on first use
+ // and destructed when the thread exits. Upon construction, the TLS structure
+ // registers itself with the WatchDog, and on destruction, unregisters itself.
+ //
+ // See 'seq_lock_' below for details on thread-safe operation.
+ struct TLS {
+ TLS();
+ ~TLS();
+
+ enum Constants {
+ // The maximum nesting depth of SCOPED_WATCH_STACK() macros.
+ kMaxDepth = 8
+ };
+
+ // Because we support nested SCOPED_WATCH_STACK() macros, we need to capture
+ // multiple active frames within the TLS.
+ struct Frame {
+ // The time at which this frame entered the SCOPED_WATCH_STACK section.
+ // We use MicrosecondsInt64 instead of MonoTime because it inlines a bit
+ // better.
+ MicrosecondsInt64 start_time_;
+ // The threshold of time beyond which the watchdog should emit warnings.
+ int threshold_ms_;
+ // A string explaining the state that the thread is in (typically a file:line
+ // string). This is expected to be static storage and is not freed.
+ const char* status_;
+ };
+
+ // The data within the TLS. This is a POD type so that the watchdog can easily
+ // copy data out of a thread's TLS.
+ struct Data {
+ Frame frames_[kMaxDepth];
+ Atomic32 depth_;
+
+ // Counter implementing a simple "sequence lock".
+ //
+ // Before modifying any data inside its TLS, the watched thread increments this value so it is
+ // odd. When the modifications are complete, it increments it again, making it even.
+ //
+ // To read the TLS data from a target thread, the watchdog thread waits for the value
+ // to become even, indicating that no write is in progress. Then, it does a potentially
+ // racy copy of the entire 'Data' structure. Then, it validates the value again.
+ // If it is has not changed, then the snapshot is guaranteed to be consistent.
+ //
+ // We use this type of locking to ensure that the watched thread is as fast as possible,
+ // allowing us to use SCOPED_WATCH_STACK even in hot code paths. In particular,
+ // the watched thread is wait-free, since it doesn't need to loop or retry. In addition, the
+ // memory is only written by that thread, eliminating any cache-line bouncing. The watchdog
+ // thread may have to loop multiple times to see a consistent snapshot, but we're OK delaying
+ // the watchdog arbitrarily since it isn't on any critical path.
+ Atomic32 seq_lock_;
+
+ // Take a consistent snapshot of this data into 'dst'. This may block if the target thread
+ // is currently modifying its TLS.
+ void SnapshotCopy(Data* dst) const;
+ };
+ Data data_;
+ };
+
+ KernelStackWatchdog();
+ ~KernelStackWatchdog();
+
+ // Get or create the TLS for the current thread.
+ static TLS* GetTLS() {
+ if (PREDICT_FALSE(!tls_)) {
+ CreateAndRegisterTLS();
+ }
+ return tls_;
+ }
+
+ // Create a new TLS for the current thread, and register it with the watchdog.
+ // Installs a callback to automatically unregister the thread upon its exit.
+ static void CreateAndRegisterTLS();
+
+ // Callback which is registered to run at thread-exit time by CreateAndRegisterTLS().
+ static void ThreadExiting(void* tls_void);
+
+ // Register a new thread's TLS with the watchdog.
+ // Called by any thread the first time it enters a watched section, when its TLS
+ // is constructed.
+ void Register(TLS* tls);
+
+ // Called when a thread is in the process of exiting, and has a registered TLS
+ // object.
+ void Unregister();
+
+ // The actual watchdog loop that the watchdog thread runs.
+ void RunThread();
+
+ DECLARE_STATIC_THREAD_LOCAL(TLS, tls_);
+
+ typedef std::unordered_map<pid_t, TLS*> TLSMap;
+ TLSMap tls_by_tid_;
+
+ // If a thread exits while the watchdog is in the middle of accessing the TLS
+ // objects, we can't immediately delete the TLS struct. Instead, the thread
+ // enqueues it here for later deletion by the watchdog thread within RunThread().
+ std::vector<std::unique_ptr<TLS>> pending_delete_;
+
+ // If non-NULL, warnings will be emitted into this vector instead of glog.
+ // Used by tests.
+ gscoped_ptr<std::vector<std::string> > log_collector_;
+
+ // Lock protecting log_collector_.
+ mutable simple_spinlock log_lock_;
+
+ // Lock protecting tls_by_tid_ and pending_delete_.
+ mutable simple_spinlock tls_lock_;
+
+ // Lock which prevents threads from unregistering while the watchdog
+ // sends signals.
+ //
+ // This is used to prevent the watchdog from sending a signal to a pid just
+ // after the pid has actually exited and been reused. Sending a signal to
+ // a non-Kudu thread could have unintended consequences.
+ //
+ // When this lock is held concurrently with 'tls_lock_' or 'log_lock_',
+ // this lock must be acquired first.
+ Mutex unregister_lock_;
+
+ // The watchdog thread itself.
+ scoped_refptr<Thread> thread_;
+
+ // Signal to stop the watchdog.
+ CountDownLatch finish_;
+
+ DISALLOW_COPY_AND_ASSIGN(KernelStackWatchdog);
+};
+
+// Scoped object which marks the current thread for watching.
+class ScopedWatchKernelStack {
+ public:
+ // If the current scope is active more than 'threshold_ms' milliseconds, the
+ // watchdog thread will log a warning including the message 'label'. 'label'
+ // is not copied or freed.
+ ScopedWatchKernelStack(const char* label, int threshold_ms) {
+ if (threshold_ms <= 0) return;
+
+ // Rather than just using the lazy GetTLS() method, we'll first try to load
+ // the TLS ourselves. This is usually successful, and avoids us having to inline
+ // the TLS construction path at call sites.
+ KernelStackWatchdog::TLS* tls = KernelStackWatchdog::tls_;
+ if (PREDICT_FALSE(tls == NULL)) {
+ tls = KernelStackWatchdog::GetTLS();
+ }
+ KernelStackWatchdog::TLS::Data* tls_data = &tls->data_;
+
+ // "Acquire" the sequence lock. While the lock value is odd, readers will block.
+ // TODO: technically this barrier is stronger than we need: we are the only writer
+ // to this data, so it's OK to allow loads from within the critical section to
+ // reorder above this next line. All we need is a "StoreStore" barrier (i.e.
+ // prevent any stores in the critical section from getting reordered above the
+ // increment of the counter). However, atomicops.h doesn't provide such a barrier
+ // as of yet, so we'll do the slightly more expensive one for now.
+ base::subtle::Acquire_Store(&tls_data->seq_lock_, tls_data->seq_lock_ + 1);
+
+ KernelStackWatchdog::TLS::Frame* frame = &tls_data->frames_[tls_data->depth_++];
+ DCHECK_LE(tls_data->depth_, KernelStackWatchdog::TLS::kMaxDepth);
+ frame->start_time_ = GetMonoTimeMicros();
+ frame->threshold_ms_ = threshold_ms;
+ frame->status_ = label;
+
+ // "Release" the sequence lock. This resets the lock value to be even, so readers
+ // will proceed.
+ base::subtle::Release_Store(&tls_data->seq_lock_, tls_data->seq_lock_ + 1);
+ }
+
+ ~ScopedWatchKernelStack() {
+ if (!KernelStackWatchdog::tls_) return;
+
+ KernelStackWatchdog::TLS::Data* tls = &KernelStackWatchdog::tls_->data_;
+ int d = tls->depth_;
+ DCHECK_GT(d, 0);
+
+ // We don't bother with a lock/unlock, because the change we're making here is atomic.
+ // If we race with the watchdog, either they'll see the old depth_ or the new depth_,
+ // but in either case the underlying data is perfectly valid.
+ base::subtle::NoBarrier_Store(&tls->depth_, d - 1);
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ScopedWatchKernelStack);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_KERNEL_STACK_WATCHDOG_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/knapsack_solver-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/knapsack_solver-test.cc b/be/src/kudu/util/knapsack_solver-test.cc
new file mode 100644
index 0000000..9f717c4
--- /dev/null
+++ b/be/src/kudu/util/knapsack_solver-test.cc
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/knapsack_solver.h"
+#include "kudu/util/stopwatch.h" // IWYU pragma: keep
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class TestKnapsack : public KuduTest {
+};
+
+// A simple test item for use with the knapsack solver.
+// The real code will be solving knapsack over RowSet objects --
+// using simple value/weight pairs in the tests makes it standalone.
+struct TestItem {
+ TestItem(double v, int w)
+ : value(v), weight(w) {
+ }
+
+ double value;
+ int weight;
+};
+
+// A traits class to adapt the knapsack solver to TestItem.
+struct TestItemTraits {
+ typedef TestItem item_type;
+ typedef double value_type;
+ static int get_weight(const TestItem &item) {
+ return item.weight;
+ }
+ static value_type get_value(const TestItem &item) {
+ return item.value;
+ }
+};
+
+// Generate random items into the provided vector.
+static void GenerateRandomItems(int n_items, int max_weight,
+ vector<TestItem> *out) {
+ for (int i = 0; i < n_items; i++) {
+ double value = 10000.0 / (random() % 10000 + 1);
+ int weight = random() % max_weight;
+ out->push_back(TestItem(value, weight));
+ }
+}
+
+// Join and stringify the given list of ints.
+static string JoinInts(const vector<int> &ints) {
+ string ret;
+ for (int i = 0; i < ints.size(); i++) {
+ if (i > 0) {
+ ret.push_back(',');
+ }
+ ret.append(std::to_string(ints[i]));
+ }
+ return ret;
+}
+
+TEST_F(TestKnapsack, Basics) {
+ KnapsackSolver<TestItemTraits> solver;
+
+ vector<TestItem> in;
+ in.emplace_back(500, 3);
+ in.emplace_back(110, 1);
+ in.emplace_back(125, 1);
+ in.emplace_back(100, 1);
+
+ vector<int> out;
+ double max_val;
+
+ // For 1 weight, pick item 2
+ solver.Solve(in, 1, &out, &max_val);
+ ASSERT_DOUBLE_EQ(125, max_val);
+ ASSERT_EQ("2", JoinInts(out));
+ out.clear();
+
+ // For 2 weight, pick item 1, 2
+ solver.Solve(in, 2, &out, &max_val);
+ ASSERT_DOUBLE_EQ(110 + 125, max_val);
+ ASSERT_EQ("2,1", JoinInts(out));
+ out.clear();
+
+ // For 3 weight, pick item 0
+ solver.Solve(in, 3, &out, &max_val);
+ ASSERT_DOUBLE_EQ(500, max_val);
+ ASSERT_EQ("0", JoinInts(out));
+ out.clear();
+
+ // For 10 weight, pick all.
+ solver.Solve(in, 10, &out, &max_val);
+ ASSERT_DOUBLE_EQ(500 + 110 + 125 + 100, max_val);
+ ASSERT_EQ("3,2,1,0", JoinInts(out));
+ out.clear();
+}
+
+// Test which generates random knapsack instances and verifies
+// that the result satisfies the constraints.
+TEST_F(TestKnapsack, Randomized) {
+ SeedRandom();
+ KnapsackSolver<TestItemTraits> solver;
+
+ const int kNumTrials = AllowSlowTests() ? 200 : 1;
+ const int kMaxWeight = 1000;
+ const int kNumItems = 1000;
+
+ for (int i = 0; i < kNumTrials; i++) {
+ vector<TestItem> in;
+ vector<int> out;
+ GenerateRandomItems(kNumItems, kMaxWeight, &in);
+ double max_val;
+ int max_weight = random() % kMaxWeight;
+ solver.Solve(in, max_weight, &out, &max_val);
+
+ // Verify that the max_val is equal to the sum of the chosen items' values.
+ double sum_val = 0;
+ int sum_weight = 0;
+ for (int i : out) {
+ sum_val += in[i].value;
+ sum_weight += in[i].weight;
+ }
+ ASSERT_NEAR(max_val, sum_val, 0.000001);
+ ASSERT_LE(sum_weight, max_weight);
+ }
+}
+
+#ifdef NDEBUG
+TEST_F(TestKnapsack, Benchmark) {
+ KnapsackSolver<TestItemTraits> solver;
+
+ const int kNumTrials = 1000;
+ const int kMaxWeight = 1000;
+ const int kNumItems = 1000;
+
+ vector<TestItem> in;
+ GenerateRandomItems(kNumItems, kMaxWeight, &in);
+
+ LOG_TIMING(INFO, "benchmark") {
+ vector<int> out;
+ for (int i = 0; i < kNumTrials; i++) {
+ out.clear();
+ double max_val;
+ solver.Solve(in, random() % kMaxWeight, &out, &max_val);
+ }
+ }
+}
+#endif
+
+} // namespace kudu