You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:30 UTC

[18/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/interval_tree-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/interval_tree-test.cc b/be/src/kudu/util/interval_tree-test.cc
new file mode 100644
index 0000000..df143d3
--- /dev/null
+++ b/be/src/kudu/util/interval_tree-test.cc
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// All rights reserved.
+
+#include <algorithm>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <tuple>  // IWYU pragma: keep
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/interval_tree.h"
+#include "kudu/util/interval_tree-inl.h"
+#include "kudu/util/test_util.h"
+
+using std::pair;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+// Test harness.
+class TestIntervalTree : public KuduTest {
+};
+
+// Simple interval class for integer intervals.
+struct IntInterval {
+  IntInterval(int left, int right, int id = -1)
+      : left(left),
+        right(right),
+        id(id) {
+  }
+
+  bool Intersects(const IntInterval &other) const {
+    if (other.left > right) return false;
+    if (left > other.right) return false;
+    return true;
+  }
+
+  string ToString() const {
+    return strings::Substitute("[$0, $1]($2) ", left, right, id);
+  }
+
+  int left, right, id;
+};
+
+// A wrapper around an int which can be compared with IntTraits::compare()
+// but also can keep a counter of how many times it has been compared. Used
+// for TestBigO below.
+struct CountingQueryPoint {
+  explicit CountingQueryPoint(int v)
+      : val(v),
+        count(new int(0)) {
+  }
+
+  int val;
+  std::shared_ptr<int> count;
+};
+
+// Traits definition for intervals made up of ints on either end.
+struct IntTraits {
+  typedef int point_type;
+  typedef IntInterval interval_type;
+  static point_type get_left(const IntInterval &x) {
+    return x.left;
+  }
+  static point_type get_right(const IntInterval &x) {
+    return x.right;
+  }
+  static int compare(int a, int b) {
+    if (a < b) return -1;
+    if (a > b) return 1;
+    return 0;
+  }
+
+  static int compare(const CountingQueryPoint& q, int b) {
+    (*q.count)++;
+    return compare(q.val, b);
+  }
+  static int compare(int a, const CountingQueryPoint& b) {
+    return -compare(b, a);
+  }
+
+};
+
+// Compare intervals in an arbitrary but consistent way - this is only
+// used for verifying that the two algorithms come up with the same results.
+// It's not necessary to define this to use an interval tree.
+static bool CompareIntervals(const IntInterval &a, const IntInterval &b) {
+  return std::make_tuple(a.left, a.right, a.id) <
+    std::make_tuple(b.left, b.right, b.id);
+}
+
+// Stringify a list of int intervals, for easy test error reporting.
+static string Stringify(const vector<IntInterval> &intervals) {
+  string ret;
+  bool first = true;
+  for (const IntInterval &interval : intervals) {
+    if (!first) {
+      ret.append(",");
+    }
+    ret.append(interval.ToString());
+  }
+  return ret;
+}
+
+// Find any intervals in 'intervals' which contain 'query_point' by brute force.
+static void FindContainingBruteForce(const vector<IntInterval> &intervals,
+                                     int query_point,
+                                     vector<IntInterval> *results) {
+  for (const IntInterval &i : intervals) {
+    if (query_point >= i.left && query_point <= i.right) {
+      results->push_back(i);
+    }
+  }
+}
+
+
+// Find any intervals in 'intervals' which intersect 'query_interval' by brute force.
+static void FindIntersectingBruteForce(const vector<IntInterval> &intervals,
+                                       IntInterval query_interval,
+                                       vector<IntInterval> *results) {
+  for (const IntInterval &i : intervals) {
+    if (query_interval.Intersects(i)) {
+      results->push_back(i);
+    }
+  }
+}
+
+
+// Verify that IntervalTree::FindContainingPoint yields the same results as the naive
+// brute-force O(n) algorithm.
+static void VerifyFindContainingPoint(const vector<IntInterval> all_intervals,
+                                      const IntervalTree<IntTraits> &tree,
+                                      int query_point) {
+  vector<IntInterval> results;
+  tree.FindContainingPoint(query_point, &results);
+  std::sort(results.begin(), results.end(), CompareIntervals);
+
+  vector<IntInterval> brute_force;
+  FindContainingBruteForce(all_intervals, query_point, &brute_force);
+  std::sort(brute_force.begin(), brute_force.end(), CompareIntervals);
+
+  SCOPED_TRACE(Stringify(all_intervals) + StringPrintf(" (q=%d)", query_point));
+  EXPECT_EQ(Stringify(brute_force), Stringify(results));
+}
+
+// Verify that IntervalTree::FindIntersectingInterval yields the same results as the naive
+// brute-force O(n) algorithm.
+static void VerifyFindIntersectingInterval(const vector<IntInterval> all_intervals,
+                                           const IntervalTree<IntTraits> &tree,
+                                           const IntInterval &query_interval) {
+  vector<IntInterval> results;
+  tree.FindIntersectingInterval(query_interval, &results);
+  std::sort(results.begin(), results.end(), CompareIntervals);
+
+  vector<IntInterval> brute_force;
+  FindIntersectingBruteForce(all_intervals, query_interval, &brute_force);
+  std::sort(brute_force.begin(), brute_force.end(), CompareIntervals);
+
+  SCOPED_TRACE(Stringify(all_intervals) +
+               StringPrintf(" (q=[%d,%d])", query_interval.left, query_interval.right));
+  EXPECT_EQ(Stringify(brute_force), Stringify(results));
+}
+
+static vector<IntInterval> CreateRandomIntervals(int n = 100) {
+  vector<IntInterval> intervals;
+  for (int i = 0; i < n; i++) {
+    int l = rand() % 100; // NOLINT(runtime/threadsafe_fn)
+    int r = l + rand() % 20; // NOLINT(runtime/threadsafe_fn)
+    intervals.emplace_back(l, r, i);
+  }
+  return intervals;
+}
+
+TEST_F(TestIntervalTree, TestBasic) {
+  vector<IntInterval> intervals;
+  intervals.emplace_back(1, 2, 1);
+  intervals.emplace_back(3, 4, 2);
+  intervals.emplace_back(1, 4, 3);
+  IntervalTree<IntTraits> t(intervals);
+
+  for (int i = 0; i <= 5; i++) {
+    VerifyFindContainingPoint(intervals, t, i);
+
+    for (int j = i; j <= 5; j++) {
+      VerifyFindIntersectingInterval(intervals, t, IntInterval(i, j, 0));
+    }
+  }
+}
+
+TEST_F(TestIntervalTree, TestRandomized) {
+  SeedRandom();
+
+  // Generate 100 random intervals spanning 0-200 and build an interval tree from them.
+  vector<IntInterval> intervals = CreateRandomIntervals();
+  IntervalTree<IntTraits> t(intervals);
+
+  // Test that we get the correct result on every possible query.
+  for (int i = -1; i < 201; i++) {
+    VerifyFindContainingPoint(intervals, t, i);
+  }
+
+  // Test that we get the correct result for random intervals
+  for (int i = 0; i < 100; i++) {
+    int l = rand() % 100; // NOLINT(runtime/threadsafe_fn)
+    int r = l + rand() % 100; // NOLINT(runtime/threadsafe_fn)
+    VerifyFindIntersectingInterval(intervals, t, IntInterval(l, r));
+  }
+}
+
+TEST_F(TestIntervalTree, TestEmpty) {
+  vector<IntInterval> empty;
+  IntervalTree<IntTraits> t(empty);
+
+  VerifyFindContainingPoint(empty, t, 1);
+  VerifyFindIntersectingInterval(empty, t, IntInterval(1, 2, 0));
+}
+
+TEST_F(TestIntervalTree, TestBigO) {
+#ifndef NDEBUG
+  LOG(WARNING) << "big-O results are not valid if DCHECK is enabled";
+  return;
+#endif
+  SeedRandom();
+
+  LOG(INFO) << "num_int\tnum_q\tresults\tsimple\tbatch";
+  for (int num_intervals = 1; num_intervals < 2000; num_intervals *= 2) {
+    vector<IntInterval> intervals = CreateRandomIntervals(num_intervals);
+    IntervalTree<IntTraits> t(intervals);
+    for (int num_queries = 1; num_queries < 2000; num_queries *= 2) {
+      vector<CountingQueryPoint> queries;
+      for (int i = 0; i < num_queries; i++) {
+        queries.emplace_back(rand() % 100);
+      }
+      std::sort(queries.begin(), queries.end(),
+                [](const CountingQueryPoint& a,
+                   const CountingQueryPoint& b) {
+                  return a.val < b.val;
+                });
+
+      // Test using batch algorithm.
+      int num_results_batch = 0;
+      t.ForEachIntervalContainingPoints(
+          queries,
+          [&](CountingQueryPoint query_point, const IntInterval& interval) {
+            num_results_batch++;
+          });
+      int num_comparisons_batch = 0;
+      for (const auto& q : queries) {
+        num_comparisons_batch += *q.count;
+        *q.count = 0;
+      }
+
+      // Test using one-by-one queries.
+      int num_results_simple = 0;
+      for (auto& q : queries) {
+        vector<IntInterval> intervals;
+        t.FindContainingPoint(q, &intervals);
+        num_results_simple += intervals.size();
+      }
+      int num_comparisons_simple = 0;
+      for (const auto& q : queries) {
+        num_comparisons_simple += *q.count;
+      }
+      ASSERT_EQ(num_results_simple, num_results_batch);
+
+      LOG(INFO) << num_intervals << "\t" << num_queries << "\t" << num_results_simple << "\t"
+                << num_comparisons_simple << "\t" << num_comparisons_batch;
+    }
+  }
+}
+
+TEST_F(TestIntervalTree, TestMultiQuery) {
+  SeedRandom();
+  const int kNumQueries = 1;
+  vector<IntInterval> intervals = CreateRandomIntervals(10);
+  IntervalTree<IntTraits> t(intervals);
+
+  // Generate random queries.
+  vector<int> queries;
+  for (int i = 0; i < kNumQueries; i++) {
+    queries.push_back(rand() % 100);
+  }
+  std::sort(queries.begin(), queries.end());
+
+  vector<pair<string, int>> results_simple;
+  for (int q : queries) {
+    vector<IntInterval> intervals;
+    t.FindContainingPoint(q, &intervals);
+    for (const auto& interval : intervals) {
+      results_simple.emplace_back(interval.ToString(), q);
+    }
+  }
+
+  vector<pair<string, int>> results_batch;
+  t.ForEachIntervalContainingPoints(
+      queries,
+      [&](int query_point, const IntInterval& interval) {
+        results_batch.emplace_back(interval.ToString(), query_point);
+      });
+
+  // Check the property that, when the batch query points are in sorted order,
+  // the results are grouped by interval, and within each interval, sorted by
+  // query point. Each interval may have at most two groups.
+  boost::optional<pair<string, int>> prev = boost::none;
+  std::map<string, int> intervals_seen;
+  for (int i = 0; i < results_batch.size(); i++) {
+    const auto& cur = results_batch[i];
+    // If it's another query point hitting the same interval,
+    // make sure the query points are returned in order.
+    if (prev && prev->first == cur.first) {
+      EXPECT_GE(cur.second, prev->second) << prev->first;
+    } else {
+      // It's the start of a new interval's data. Make sure that we don't
+      // see the same interval twice.
+      EXPECT_LE(++intervals_seen[cur.first], 2)
+          << "Saw more than two groups for interval " << cur.first;
+    }
+    prev = cur;
+  }
+
+  std::sort(results_simple.begin(), results_simple.end());
+  std::sort(results_batch.begin(), results_batch.end());
+  ASSERT_EQ(results_simple, results_batch);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/interval_tree.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/interval_tree.h b/be/src/kudu/util/interval_tree.h
new file mode 100644
index 0000000..a677528
--- /dev/null
+++ b/be/src/kudu/util/interval_tree.h
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Implements an Interval Tree. See http://en.wikipedia.org/wiki/Interval_tree
+// or CLRS for a full description of the data structure.
+//
+// Callers of this class should also include interval_tree-inl.h for function
+// definitions.
+#ifndef KUDU_UTIL_INTERVAL_TREE_H
+#define KUDU_UTIL_INTERVAL_TREE_H
+
+#include <glog/logging.h>
+
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+
+namespace interval_tree_internal {
+template<class Traits>
+class ITNode;
+}
+
+// Implements an Interval Tree.
+//
+// An Interval Tree is a data structure which stores a set of intervals and supports
+// efficient searches to determine which intervals in that set overlap a query
+// point or interval. These operations are O(lg n + k) where 'n' is the number of
+// intervals in the tree and 'k' is the number of results returned for a given query.
+//
+// This particular implementation is a static tree -- intervals may not be added or
+// removed once the tree is instantiated.
+//
+// This class also assumes that all intervals are "closed" intervals -- the intervals
+// are inclusive of their start and end points.
+//
+// The Traits class should have the following members:
+//   Traits::point_type
+//     a typedef for what a "point" in the range is
+//
+//   Traits::interval_type
+//     a typedef for an interval
+//
+//   static point_type get_left(const interval_type &)
+//   static point_type get_right(const interval_type &)
+//     accessors which fetch the left and right bound of the interval, respectively.
+//
+//   static int compare(const point_type &a, const point_type &b)
+//     return < 0 if a < b, 0 if a == b, > 0 if a > b
+//
+// See interval_tree-test.cc for an example Traits class for 'int' ranges.
+template<class Traits>
+class IntervalTree {
+ private:
+  // Import types from the traits class to make code more readable.
+  typedef typename Traits::interval_type interval_type;
+  typedef typename Traits::point_type point_type;
+
+  // And some convenience types.
+  typedef std::vector<interval_type> IntervalVector;
+  typedef interval_tree_internal::ITNode<Traits> node_type;
+
+ public:
+  // Construct an Interval Tree containing the given set of intervals.
+  explicit IntervalTree(const IntervalVector &intervals);
+
+  ~IntervalTree();
+
+  // Find all intervals in the tree which contain the query point.
+  // The resulting intervals are added to the 'results' vector.
+  // The vector is not cleared first.
+  //
+  // NOTE: 'QueryPointType' is usually point_type, but can be any other
+  // type for which there exists the appropriate Traits::Compare(...) method.
+  template<class QueryPointType>
+  void FindContainingPoint(const QueryPointType &query,
+                           IntervalVector *results) const;
+
+  // For each of the query points in the STL container 'queries', find all
+  // intervals in the tree which may contain those points. Calls 'cb(point, interval)'
+  // for each such interval.
+  //
+  // The points in the query container must be comparable to 'point_type'
+  // using Traits::Compare().
+  //
+  // The implementation sequences the calls to 'cb' with the following guarantees:
+  // 1) all of the results corresponding to a given interval will be yielded in at
+  //    most two "groups" of calls (i.e. sub-sequences of calls with the same interval).
+  // 2) within each "group" of calls, the query points will be in ascending order.
+  //
+  // For example, the callback sequence may be:
+  //
+  //  cb(q1, interval_1) -
+  //  cb(q2, interval_1)  | first group of interval_1
+  //  cb(q6, interval_1)  |
+  //  cb(q7, interval_1) -
+  //
+  //  cb(q2, interval_2) -
+  //  cb(q3, interval_2)  | first group of interval_2
+  //  cb(q4, interval_2) -
+  //
+  //  cb(q3, interval_1) -
+  //  cb(q4, interval_1)  | second group of interval_1
+  //  cb(q5, interval_1) -
+  //
+  //  cb(q2, interval_3) -
+  //  cb(q3, interval_3)  | first group of interval_3
+  //  cb(q4, interval_3) -
+  //
+  //  cb(q5, interval_2) -
+  //  cb(q6, interval_2)  | second group of interval_2
+  //  cb(q7, interval_2) -
+  //
+  // REQUIRES: The input points must be pre-sorted or else this will return invalid
+  // results.
+  template<class Callback, class QueryContainer>
+  void ForEachIntervalContainingPoints(const QueryContainer& queries,
+                                       const Callback& cb) const;
+
+  // Find all intervals in the tree which intersect the given interval.
+  // The resulting intervals are added to the 'results' vector.
+  // The vector is not cleared first.
+  void FindIntersectingInterval(const interval_type &query,
+                                IntervalVector *results) const;
+ private:
+  static void Partition(const IntervalVector &in,
+                        point_type *split_point,
+                        IntervalVector *left,
+                        IntervalVector *overlapping,
+                        IntervalVector *right);
+
+  // Create a node containing the given intervals, recursively splitting down the tree.
+  static node_type *CreateNode(const IntervalVector &intervals);
+
+  node_type *root_;
+
+  DISALLOW_COPY_AND_ASSIGN(IntervalTree);
+};
+
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonreader-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonreader-test.cc b/be/src/kudu/util/jsonreader-test.cc
new file mode 100644
index 0000000..9f62c31
--- /dev/null
+++ b/be/src/kudu/util/jsonreader-test.cc
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/jsonreader.h"
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+using rapidjson::Value;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+TEST(JsonReaderTest, Corrupt) {
+  JsonReader r("");
+  Status s = r.Init();
+  ASSERT_TRUE(s.IsCorruption());
+  ASSERT_STR_CONTAINS(
+      s.ToString(), "JSON text is corrupt: Text only contains white space(s)");
+}
+
+TEST(JsonReaderTest, Empty) {
+  JsonReader r("{}");
+  ASSERT_OK(r.Init());
+  JsonReader r2("[]");
+  ASSERT_OK(r2.Init());
+
+  // Not found.
+  ASSERT_TRUE(r.ExtractBool(r.root(), "foo", nullptr).IsNotFound());
+  ASSERT_TRUE(r.ExtractInt32(r.root(), "foo", nullptr).IsNotFound());
+  ASSERT_TRUE(r.ExtractInt64(r.root(), "foo", nullptr).IsNotFound());
+  ASSERT_TRUE(r.ExtractString(r.root(), "foo", nullptr).IsNotFound());
+  ASSERT_TRUE(r.ExtractObject(r.root(), "foo", nullptr).IsNotFound());
+  ASSERT_TRUE(r.ExtractObjectArray(r.root(), "foo", nullptr).IsNotFound());
+}
+
+TEST(JsonReaderTest, Basic) {
+  JsonReader r("{ \"foo\" : \"bar\" }");
+  ASSERT_OK(r.Init());
+  string foo;
+  ASSERT_OK(r.ExtractString(r.root(), "foo", &foo));
+  ASSERT_EQ("bar", foo);
+
+  // Bad types.
+  ASSERT_TRUE(r.ExtractBool(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt32(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt64(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObject(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObjectArray(r.root(), "foo", nullptr).IsInvalidArgument());
+}
+
+TEST(JsonReaderTest, LessBasic) {
+  string doc = Substitute(
+      "{ \"small\" : 1, \"big\" : $0, \"null\" : null, \"empty\" : \"\", \"bool\" : true }",
+      kint64max);
+  JsonReader r(doc);
+  ASSERT_OK(r.Init());
+  int32_t small;
+  ASSERT_OK(r.ExtractInt32(r.root(), "small", &small));
+  ASSERT_EQ(1, small);
+  int64_t big;
+  ASSERT_OK(r.ExtractInt64(r.root(), "big", &big));
+  ASSERT_EQ(kint64max, big);
+  string str;
+  ASSERT_OK(r.ExtractString(r.root(), "null", &str));
+  ASSERT_EQ("", str);
+  ASSERT_OK(r.ExtractString(r.root(), "empty", &str));
+  ASSERT_EQ("", str);
+  bool b;
+  ASSERT_OK(r.ExtractBool(r.root(), "bool", &b));
+  ASSERT_TRUE(b);
+
+  // Bad types.
+  ASSERT_TRUE(r.ExtractBool(r.root(), "small", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractString(r.root(), "small", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObject(r.root(), "small", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObjectArray(r.root(), "small", nullptr).IsInvalidArgument());
+
+  ASSERT_TRUE(r.ExtractBool(r.root(), "big", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt32(r.root(), "big", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractString(r.root(), "big", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObject(r.root(), "big", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObjectArray(r.root(), "big", nullptr).IsInvalidArgument());
+
+  ASSERT_TRUE(r.ExtractBool(r.root(), "null", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt32(r.root(), "null", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt64(r.root(), "null", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObject(r.root(), "null", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObjectArray(r.root(), "null", nullptr).IsInvalidArgument());
+
+  ASSERT_TRUE(r.ExtractBool(r.root(), "empty", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt32(r.root(), "empty", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt64(r.root(), "empty", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObject(r.root(), "empty", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObjectArray(r.root(), "empty", nullptr).IsInvalidArgument());
+
+  ASSERT_TRUE(r.ExtractInt32(r.root(), "bool", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt64(r.root(), "bool", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractString(r.root(), "bool", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObject(r.root(), "bool", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObjectArray(r.root(), "bool", nullptr).IsInvalidArgument());
+}
+
+TEST(JsonReaderTest, Objects) {
+  JsonReader r("{ \"foo\" : { \"1\" : 1 } }");
+  ASSERT_OK(r.Init());
+
+  const Value* foo = nullptr;
+  ASSERT_OK(r.ExtractObject(r.root(), "foo", &foo));
+  ASSERT_TRUE(foo);
+
+  int32_t one;
+  ASSERT_OK(r.ExtractInt32(foo, "1", &one));
+  ASSERT_EQ(1, one);
+
+  // Bad types.
+  ASSERT_TRUE(r.ExtractBool(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt32(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt64(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractString(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObjectArray(r.root(), "foo", nullptr).IsInvalidArgument());
+}
+
+TEST(JsonReaderTest, TopLevelArray) {
+  JsonReader r("[ { \"name\" : \"foo\" }, { \"name\" : \"bar\" } ]");
+  ASSERT_OK(r.Init());
+
+  vector<const Value*> objs;
+  ASSERT_OK(r.ExtractObjectArray(r.root(), nullptr, &objs));
+  ASSERT_EQ(2, objs.size());
+  string name;
+  ASSERT_OK(r.ExtractString(objs[0], "name", &name));
+  ASSERT_EQ("foo", name);
+  ASSERT_OK(r.ExtractString(objs[1], "name", &name));
+  ASSERT_EQ("bar", name);
+
+  // Bad types.
+  ASSERT_TRUE(r.ExtractBool(r.root(), nullptr, nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt32(r.root(), nullptr, nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt64(r.root(), nullptr, nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractString(r.root(), nullptr, nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObject(r.root(), nullptr, nullptr).IsInvalidArgument());
+}
+
+TEST(JsonReaderTest, NestedArray) {
+  JsonReader r("{ \"foo\" : [ { \"val\" : 0 }, { \"val\" : 1 }, { \"val\" : 2 } ] }");
+  ASSERT_OK(r.Init());
+
+  vector<const Value*> foo;
+  ASSERT_OK(r.ExtractObjectArray(r.root(), "foo", &foo));
+  ASSERT_EQ(3, foo.size());
+  int i = 0;
+  for (const Value* v : foo) {
+    int32_t number;
+    ASSERT_OK(r.ExtractInt32(v, "val", &number));
+    ASSERT_EQ(i, number);
+    i++;
+  }
+
+  // Bad types.
+  ASSERT_TRUE(r.ExtractBool(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt32(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractInt64(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractString(r.root(), "foo", nullptr).IsInvalidArgument());
+  ASSERT_TRUE(r.ExtractObject(r.root(), "foo", nullptr).IsInvalidArgument());
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonreader.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonreader.cc b/be/src/kudu/util/jsonreader.cc
new file mode 100644
index 0000000..acbc869
--- /dev/null
+++ b/be/src/kudu/util/jsonreader.cc
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/jsonreader.h"
+
+#include <utility>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using rapidjson::Value;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+JsonReader::JsonReader(string text) : text_(std::move(text)) {}
+
+JsonReader::~JsonReader() {
+}
+
+Status JsonReader::Init() {
+  document_.Parse<0>(text_.c_str());
+  if (document_.HasParseError()) {
+    return Status::Corruption("JSON text is corrupt", document_.GetParseError());
+  }
+  return Status::OK();
+}
+
+Status JsonReader::ExtractBool(const Value* object,
+                               const char* field,
+                               bool* result) const {
+  const Value* val;
+  RETURN_NOT_OK(ExtractField(object, field, &val));
+  if (PREDICT_FALSE(!val->IsBool())) {
+    return Status::InvalidArgument(Substitute(
+        "Wrong type during field extraction: expected bool but got $0",
+        val->GetType()));
+  }
+  *result = val->GetBool();
+  return Status::OK();
+}
+
+Status JsonReader::ExtractInt32(const Value* object,
+                                const char* field,
+                                int32_t* result) const {
+  const Value* val;
+  RETURN_NOT_OK(ExtractField(object, field, &val));
+  if (PREDICT_FALSE(!val->IsInt())) {
+    return Status::InvalidArgument(Substitute(
+        "Wrong type during field extraction: expected int32 but got $0",
+        val->GetType()));
+  }
+  *result = val->GetUint();
+  return Status::OK();
+}
+
+Status JsonReader::ExtractInt64(const Value* object,
+                                const char* field,
+                                int64_t* result) const {
+  const Value* val;
+  RETURN_NOT_OK(ExtractField(object, field, &val));
+  if (PREDICT_FALSE(!val->IsInt64())) {
+    return Status::InvalidArgument(Substitute(
+        "Wrong type during field extraction: expected int64 but got $0",
+        val->GetType()));  }
+  *result = val->GetUint64();
+  return Status::OK();
+}
+
+Status JsonReader::ExtractString(const Value* object,
+                                 const char* field,
+                                 string* result) const {
+  const Value* val;
+  RETURN_NOT_OK(ExtractField(object, field, &val));
+  if (PREDICT_FALSE(!val->IsString())) {
+    if (val->IsNull()) {
+      *result = "";
+      return Status::OK();
+    }
+    return Status::InvalidArgument(Substitute(
+        "Wrong type during field extraction: expected string but got $0",
+        val->GetType()));  }
+  result->assign(val->GetString());
+  return Status::OK();
+}
+
+Status JsonReader::ExtractObject(const Value* object,
+                                 const char* field,
+                                 const Value** result) const {
+  const Value* val;
+  RETURN_NOT_OK(ExtractField(object, field, &val));
+  if (PREDICT_FALSE(!val->IsObject())) {
+    return Status::InvalidArgument(Substitute(
+        "Wrong type during field extraction: expected object but got $0",
+        val->GetType()));  }
+  *result = val;
+  return Status::OK();
+}
+
+Status JsonReader::ExtractObjectArray(const Value* object,
+                                      const char* field,
+                                      vector<const Value*>* result) const {
+  const Value* val;
+  RETURN_NOT_OK(ExtractField(object, field, &val));
+  if (PREDICT_FALSE(!val->IsArray())) {
+    return Status::InvalidArgument(Substitute(
+        "Wrong type during field extraction: expected object array but got $0",
+        val->GetType()));  }
+  for (Value::ConstValueIterator iter = val->Begin(); iter != val->End(); ++iter) {
+    result->push_back(iter);
+  }
+  return Status::OK();
+}
+
+Status JsonReader::ExtractField(const Value* object,
+                                const char* field,
+                                const Value** result) const {
+  if (field && PREDICT_FALSE(!object->HasMember(field))) {
+    return Status::NotFound("Missing field", field);
+  }
+  *result = field ? &(*object)[field] : object;
+  return Status::OK();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonreader.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonreader.h b/be/src/kudu/util/jsonreader.h
new file mode 100644
index 0000000..e389b57
--- /dev/null
+++ b/be/src/kudu/util/jsonreader.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_JSONREADER_H_
+#define KUDU_UTIL_JSONREADER_H_
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <rapidjson/document.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Wraps the JSON parsing functionality of rapidjson::Document.
+//
+// Unlike JsonWriter, this class does not hide rapidjson internals from
+// clients. That's because there's just no easy way to implement object and
+// array parsing otherwise. At most, this class aspires to be a simpler
+// error-handling wrapper for reading and parsing.
+class JsonReader {
+ public:
+  explicit JsonReader(std::string text);
+  ~JsonReader();
+
+  Status Init();
+
+  // Extractor methods.
+  //
+  // If 'field' is not NULL, will look for a field with that name in the
+  // given object, returning Status::NotFound if it cannot be found. If
+  // 'field' is NULL, will try to convert 'object' directly into the
+  // desire type.
+
+  Status ExtractBool(const rapidjson::Value* object,
+                     const char* field,
+                     bool* result) const;
+
+  Status ExtractInt32(const rapidjson::Value* object,
+                      const char* field,
+                      int32_t* result) const;
+
+  Status ExtractInt64(const rapidjson::Value* object,
+                      const char* field,
+                      int64_t* result) const;
+
+  Status ExtractString(const rapidjson::Value* object,
+                       const char* field,
+                       std::string* result) const;
+
+  // 'result' is only valid for as long as JsonReader is alive.
+  Status ExtractObject(const rapidjson::Value* object,
+                       const char* field,
+                       const rapidjson::Value** result) const;
+
+  // 'result' is only valid for as long as JsonReader is alive.
+  Status ExtractObjectArray(const rapidjson::Value* object,
+                            const char* field,
+                            std::vector<const rapidjson::Value*>* result) const;
+
+  const rapidjson::Value* root() const { return &document_; }
+
+ private:
+  Status ExtractField(const rapidjson::Value* object,
+                      const char* field,
+                      const rapidjson::Value** result) const;
+
+  std::string text_;
+  rapidjson::Document document_;
+
+  DISALLOW_COPY_AND_ASSIGN(JsonReader);
+};
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_JSONREADER_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonwriter-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonwriter-test.cc b/be/src/kudu/util/jsonwriter-test.cc
new file mode 100644
index 0000000..6f9b10d
--- /dev/null
+++ b/be/src/kudu/util/jsonwriter-test.cc
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <stdint.h>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/jsonwriter_test.pb.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+namespace google { namespace protobuf { class Message; } }
+
+using google::protobuf::Message;
+using jsonwriter_test::TestAllTypes;
+
+namespace kudu {
+
+class TestJsonWriter : public KuduTest {
+ protected:
+  void DoBenchmark(const Message& pb);
+
+  TestAllTypes MakeAllTypesPB() {
+    TestAllTypes pb;
+    pb.set_optional_int32(1);
+    pb.set_optional_int64(2);
+    pb.set_optional_uint32(3);
+    pb.set_optional_uint64(4);
+    pb.set_optional_sint32(5);
+    pb.set_optional_sint64(6);
+    pb.set_optional_fixed32(7);
+    pb.set_optional_fixed64(8);
+    pb.set_optional_sfixed32(9);
+    pb.set_optional_sfixed64(10);
+    pb.set_optional_float(11);
+    pb.set_optional_double(12);
+    pb.set_optional_bool(true);
+    pb.set_optional_string("hello world");
+    pb.set_optional_redacted_string("secret!");
+    pb.set_optional_nested_enum(TestAllTypes::FOO);
+    return pb;
+  }
+
+};
+
+TEST_F(TestJsonWriter, TestPBEmpty) {
+  TestAllTypes pb;
+  ASSERT_EQ("{}", JsonWriter::ToJson(pb, JsonWriter::PRETTY));
+}
+
+TEST_F(TestJsonWriter, TestPBAllFieldTypes) {
+  ASSERT_NE("", gflags::SetCommandLineOption("redact", "log"));
+  TestAllTypes pb = MakeAllTypesPB();
+
+  ASSERT_EQ("{\n"
+            "    \"optional_int32\": 1,\n"
+            "    \"optional_int64\": 2,\n"
+            "    \"optional_uint32\": 3,\n"
+            "    \"optional_uint64\": 4,\n"
+            "    \"optional_sint32\": 5,\n"
+            "    \"optional_sint64\": 6,\n"
+            "    \"optional_fixed32\": 7,\n"
+            "    \"optional_fixed64\": 8,\n"
+            "    \"optional_sfixed32\": 9,\n"
+            "    \"optional_sfixed64\": 10,\n"
+            "    \"optional_float\": 11,\n"
+            "    \"optional_double\": 12,\n"
+            "    \"optional_bool\": true,\n"
+            "    \"optional_string\": \"hello world\",\n"
+            "    \"optional_redacted_string\": \"<redacted>\",\n"
+            "    \"optional_nested_enum\": \"FOO\"\n"
+            "}", JsonWriter::ToJson(pb, JsonWriter::PRETTY));
+  ASSERT_EQ("{"
+            "\"optional_int32\":1,"
+            "\"optional_int64\":2,"
+            "\"optional_uint32\":3,"
+            "\"optional_uint64\":4,"
+            "\"optional_sint32\":5,"
+            "\"optional_sint64\":6,"
+            "\"optional_fixed32\":7,"
+            "\"optional_fixed64\":8,"
+            "\"optional_sfixed32\":9,"
+            "\"optional_sfixed64\":10,"
+            "\"optional_float\":11,"
+            "\"optional_double\":12,"
+            "\"optional_bool\":true,"
+            "\"optional_string\":\"hello world\","
+            "\"optional_redacted_string\":\"<redacted>\","
+            "\"optional_nested_enum\":\"FOO\""
+            "}", JsonWriter::ToJson(pb, JsonWriter::COMPACT));
+
+}
+
+TEST_F(TestJsonWriter, TestPBRepeatedPrimitives) {
+  ASSERT_NE("", gflags::SetCommandLineOption("redact", "log"));
+  TestAllTypes pb;
+  for (int i = 0; i <= 3; i++) {
+    pb.add_repeated_int32(i);
+    pb.add_repeated_string(strings::Substitute("hi $0", i));
+    pb.add_repeated_redacted_string("secret!");
+    pb.add_repeated_redacted_bytes("secret!");
+  }
+  ASSERT_EQ("{\n"
+            "    \"repeated_int32\": [\n"
+            "        0,\n"
+            "        1,\n"
+            "        2,\n"
+            "        3\n"
+            "    ],\n"
+            "    \"repeated_string\": [\n"
+            "        \"hi 0\",\n"
+            "        \"hi 1\",\n"
+            "        \"hi 2\",\n"
+            "        \"hi 3\"\n"
+            "    ],\n"
+            "    \"repeated_redacted_string\": [\n"
+            "        \"<redacted>\",\n"
+            "        \"<redacted>\",\n"
+            "        \"<redacted>\",\n"
+            "        \"<redacted>\"\n"
+            "    ],\n"
+            "    \"repeated_redacted_bytes\": [\n"
+            "        \"<redacted>\",\n"
+            "        \"<redacted>\",\n"
+            "        \"<redacted>\",\n"
+            "        \"<redacted>\"\n"
+            "    ]\n"
+            "}", JsonWriter::ToJson(pb, JsonWriter::PRETTY));
+  ASSERT_EQ("{\"repeated_int32\":[0,1,2,3],"
+            "\"repeated_string\":[\"hi 0\",\"hi 1\",\"hi 2\",\"hi 3\"],"
+            "\"repeated_redacted_string\":[\"<redacted>\",\"<redacted>\","
+            "\"<redacted>\",\"<redacted>\"],"
+            "\"repeated_redacted_bytes\":[\"<redacted>\",\"<redacted>\","
+            "\"<redacted>\",\"<redacted>\"]}",
+            JsonWriter::ToJson(pb, JsonWriter::COMPACT));
+}
+
+TEST_F(TestJsonWriter, TestPBNestedMessage) {
+  TestAllTypes pb;
+  pb.add_repeated_nested_message()->set_int_field(12345);
+  pb.mutable_optional_nested_message()->set_int_field(54321);
+  ASSERT_EQ("{\n"
+            "    \"optional_nested_message\": {\n"
+            "        \"int_field\": 54321\n"
+            "    },\n"
+            "    \"repeated_nested_message\": [\n"
+            "        {\n"
+            "            \"int_field\": 12345\n"
+            "        }\n"
+            "    ]\n"
+            "}", JsonWriter::ToJson(pb, JsonWriter::PRETTY));
+  ASSERT_EQ("{\"optional_nested_message\":{\"int_field\":54321},"
+            "\"repeated_nested_message\":"
+            "[{\"int_field\":12345}]}",
+            JsonWriter::ToJson(pb, JsonWriter::COMPACT));
+}
+
+void TestJsonWriter::DoBenchmark(const Message& pb) {
+  int64_t total_len = 0;
+  Stopwatch sw;
+  sw.start();
+  while (sw.elapsed().wall_seconds() < 5) {
+    std::ostringstream str;
+    JsonWriter jw(&str, JsonWriter::COMPACT);
+    jw.StartArray();
+    for (int i = 0; i < 10000; i++) {
+      jw.Protobuf(pb);
+    }
+    jw.EndArray();
+    total_len += str.str().size();
+  }
+  sw.stop();
+  double mbps = total_len / 1024.0 / 1024.0 / sw.elapsed().user_cpu_seconds();
+  LOG(INFO) << "Throughput: " << mbps << "MB/sec";
+}
+
+TEST_F(TestJsonWriter, BenchmarkAllTypes) {
+  DoBenchmark(MakeAllTypesPB());
+}
+
+TEST_F(TestJsonWriter, BenchmarkNestedMessage) {
+  TestAllTypes pb;
+  pb.add_repeated_nested_message()->set_int_field(12345);
+  pb.mutable_optional_nested_message()->set_int_field(54321);
+  DoBenchmark(pb);
+}
+
+TEST_F(TestJsonWriter, BenchmarkRepeatedInt64) {
+  TestAllTypes pb;
+  for (int i = 0; i < 10000; i++) {
+    pb.add_repeated_int64(i);
+  }
+  DoBenchmark(pb);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonwriter.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonwriter.cc b/be/src/kudu/util/jsonwriter.cc
new file mode 100644
index 0000000..3a5580c
--- /dev/null
+++ b/be/src/kudu/util/jsonwriter.cc
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/jsonwriter.h"
+
+#include <new>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/message.h>
+#include <rapidjson/writer.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/rapidjson.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/pb_util.pb.h"
+
+using google::protobuf::FieldDescriptor;
+using google::protobuf::Message;
+using google::protobuf::Reflection;
+
+using std::ostringstream;
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+// Adapter to allow RapidJSON to write directly to a stringstream.
+// Since Squeasel exposes a stringstream as its interface, this is needed to avoid overcopying.
+class UTF8StringStreamBuffer {
+ public:
+  explicit UTF8StringStreamBuffer(std::ostringstream* out);
+  ~UTF8StringStreamBuffer();
+  void Put(rapidjson::UTF8<>::Ch c);
+
+  void Flush();
+
+ private:
+  faststring buf_;
+  std::ostringstream* out_;
+};
+
+// rapidjson doesn't provide any common interface between the PrettyWriter and
+// Writer classes. So, we create our own pure virtual interface here, and then
+// use JsonWriterImpl<T> below to make the two different rapidjson implementations
+// correspond to this subclass.
+class JsonWriterIf {
+ public:
+  virtual void Null() = 0;
+  virtual void Bool(bool b) = 0;
+  virtual void Int(int i) = 0;
+  virtual void Uint(unsigned u) = 0;
+  virtual void Int64(int64_t i64) = 0;
+  virtual void Uint64(uint64_t u64) = 0;
+  virtual void Double(double d) = 0;
+  virtual void String(const char* str, size_t length) = 0;
+  virtual void String(const char* str) = 0;
+  virtual void String(const std::string& str) = 0;
+
+  virtual void StartObject() = 0;
+  virtual void EndObject() = 0;
+  virtual void StartArray() = 0;
+  virtual void EndArray() = 0;
+
+  virtual ~JsonWriterIf() {}
+};
+
+// Adapts the different rapidjson Writer implementations to our virtual
+// interface above.
+template<class T>
+class JsonWriterImpl : public JsonWriterIf {
+ public:
+  explicit JsonWriterImpl(ostringstream* out);
+
+  virtual void Null() OVERRIDE;
+  virtual void Bool(bool b) OVERRIDE;
+  virtual void Int(int i) OVERRIDE;
+  virtual void Uint(unsigned u) OVERRIDE;
+  virtual void Int64(int64_t i64) OVERRIDE;
+  virtual void Uint64(uint64_t u64) OVERRIDE;
+  virtual void Double(double d) OVERRIDE;
+  virtual void String(const char* str, size_t length) OVERRIDE;
+  virtual void String(const char* str) OVERRIDE;
+  virtual void String(const std::string& str) OVERRIDE;
+
+  virtual void StartObject() OVERRIDE;
+  virtual void EndObject() OVERRIDE;
+  virtual void StartArray() OVERRIDE;
+  virtual void EndArray() OVERRIDE;
+
+ private:
+  UTF8StringStreamBuffer stream_;
+  T writer_;
+  DISALLOW_COPY_AND_ASSIGN(JsonWriterImpl);
+};
+
+//
+// JsonWriter
+//
+
+typedef rapidjson::PrettyWriter<UTF8StringStreamBuffer> PrettyWriterClass;
+typedef rapidjson::Writer<UTF8StringStreamBuffer> CompactWriterClass;
+
+JsonWriter::JsonWriter(ostringstream* out, Mode m) {
+  switch (m) {
+    case PRETTY:
+      impl_.reset(new JsonWriterImpl<PrettyWriterClass>(DCHECK_NOTNULL(out)));
+      break;
+    case COMPACT:
+      impl_.reset(new JsonWriterImpl<CompactWriterClass>(DCHECK_NOTNULL(out)));
+      break;
+  }
+}
+JsonWriter::~JsonWriter() {
+}
+
+void JsonWriter::Null() { impl_->Null(); }
+void JsonWriter::Bool(bool b) { impl_->Bool(b); }
+void JsonWriter::Int(int i) { impl_->Int(i); }
+void JsonWriter::Uint(unsigned u) { impl_->Uint(u); }
+void JsonWriter::Int64(int64_t i64) { impl_->Int64(i64); }
+void JsonWriter::Uint64(uint64_t u64) { impl_->Uint64(u64); }
+void JsonWriter::Double(double d) { impl_->Double(d); }
+void JsonWriter::String(const char* str, size_t length) { impl_->String(str, length); }
+void JsonWriter::String(const char* str) { impl_->String(str); }
+void JsonWriter::String(const string& str) { impl_->String(str); }
+void JsonWriter::StartObject() { impl_->StartObject(); }
+void JsonWriter::EndObject() { impl_->EndObject(); }
+void JsonWriter::StartArray() { impl_->StartArray(); }
+void JsonWriter::EndArray() { impl_->EndArray(); }
+
+// Specializations for common primitive metric types.
+template<> void JsonWriter::Value(const bool& val) {
+  Bool(val);
+}
+template<> void JsonWriter::Value(const int32_t& val) {
+  Int(val);
+}
+template<> void JsonWriter::Value(const uint32_t& val) {
+  Uint(val);
+}
+template<> void JsonWriter::Value(const int64_t& val) {
+  Int64(val);
+}
+template<> void JsonWriter::Value(const uint64_t& val) {
+  Uint64(val);
+}
+template<> void JsonWriter::Value(const double& val) {
+  Double(val);
+}
+template<> void JsonWriter::Value(const string& val) {
+  String(val);
+}
+
+#if defined(__APPLE__)
+template<> void JsonWriter::Value(const size_t& val) {
+  Uint64(val);
+}
+#endif
+
+void JsonWriter::Protobuf(const Message& pb) {
+  const Reflection* reflection = pb.GetReflection();
+  vector<const FieldDescriptor*> fields;
+  reflection->ListFields(pb, &fields);
+
+  StartObject();
+  for (const FieldDescriptor* field : fields) {
+    String(field->name());
+    if (field->is_repeated()) {
+      StartArray();
+      int size = reflection->FieldSize(pb, field);
+      for (int i = 0; i < size; i++) {
+        ProtobufRepeatedField(pb, reflection, field, i);
+      }
+      EndArray();
+    } else {
+      ProtobufField(pb, reflection, field);
+    }
+  }
+  EndObject();
+}
+
+void JsonWriter::ProtobufField(const Message& pb, const Reflection* reflection,
+                               const FieldDescriptor* field) {
+  switch (field->cpp_type()) {
+    case FieldDescriptor::CPPTYPE_INT32:
+      Int(reflection->GetInt32(pb, field));
+      break;
+    case FieldDescriptor::CPPTYPE_INT64:
+      Int64(reflection->GetInt64(pb, field));
+      break;
+    case FieldDescriptor::CPPTYPE_UINT32:
+      Uint(reflection->GetUInt32(pb, field));
+      break;
+    case FieldDescriptor::CPPTYPE_UINT64:
+      Uint64(reflection->GetUInt64(pb, field));
+      break;
+    case FieldDescriptor::CPPTYPE_DOUBLE:
+      Double(reflection->GetDouble(pb, field));
+      break;
+    case FieldDescriptor::CPPTYPE_FLOAT:
+      Double(reflection->GetFloat(pb, field));
+      break;
+    case FieldDescriptor::CPPTYPE_BOOL:
+      Bool(reflection->GetBool(pb, field));
+      break;
+    case FieldDescriptor::CPPTYPE_ENUM:
+      String(reflection->GetEnum(pb, field)->name());
+      break;
+    case FieldDescriptor::CPPTYPE_STRING:
+      String(KUDU_MAYBE_REDACT_IF(field->options().GetExtension(REDACT),
+                                  reflection->GetString(pb, field)));
+      break;
+    case FieldDescriptor::CPPTYPE_MESSAGE:
+      Protobuf(reflection->GetMessage(pb, field));
+      break;
+    default:
+      LOG(FATAL) << "Unknown cpp_type: " << field->cpp_type();
+  }
+}
+
+void JsonWriter::ProtobufRepeatedField(const Message& pb, const Reflection* reflection,
+                                       const FieldDescriptor* field, int index) {
+  switch (field->cpp_type()) {
+    case FieldDescriptor::CPPTYPE_INT32:
+      Int(reflection->GetRepeatedInt32(pb, field, index));
+      break;
+    case FieldDescriptor::CPPTYPE_INT64:
+      Int64(reflection->GetRepeatedInt64(pb, field, index));
+      break;
+    case FieldDescriptor::CPPTYPE_UINT32:
+      Uint(reflection->GetRepeatedUInt32(pb, field, index));
+      break;
+    case FieldDescriptor::CPPTYPE_UINT64:
+      Uint64(reflection->GetRepeatedUInt64(pb, field, index));
+      break;
+    case FieldDescriptor::CPPTYPE_DOUBLE:
+      Double(reflection->GetRepeatedDouble(pb, field, index));
+      break;
+    case FieldDescriptor::CPPTYPE_FLOAT:
+      Double(reflection->GetRepeatedFloat(pb, field, index));
+      break;
+    case FieldDescriptor::CPPTYPE_BOOL:
+      Bool(reflection->GetRepeatedBool(pb, field, index));
+      break;
+    case FieldDescriptor::CPPTYPE_ENUM:
+      String(reflection->GetRepeatedEnum(pb, field, index)->name());
+      break;
+    case FieldDescriptor::CPPTYPE_STRING:
+      String(KUDU_MAYBE_REDACT_IF(field->options().GetExtension(REDACT),
+                                  reflection->GetRepeatedString(pb, field, index)));
+      break;
+    case FieldDescriptor::CPPTYPE_MESSAGE:
+      Protobuf(reflection->GetRepeatedMessage(pb, field, index));
+      break;
+    default:
+      LOG(FATAL) << "Unknown cpp_type: " << field->cpp_type();
+  }
+}
+
+string JsonWriter::ToJson(const Message& pb, Mode mode) {
+  ostringstream stream;
+  JsonWriter writer(&stream, mode);
+  writer.Protobuf(pb);
+  return stream.str();
+}
+
+//
+// UTF8StringStreamBuffer
+//
+
+UTF8StringStreamBuffer::UTF8StringStreamBuffer(std::ostringstream* out)
+  : out_(DCHECK_NOTNULL(out)) {
+}
+UTF8StringStreamBuffer::~UTF8StringStreamBuffer() {
+  DCHECK_EQ(buf_.size(), 0) << "Forgot to flush!";
+}
+
+void UTF8StringStreamBuffer::Put(rapidjson::UTF8<>::Ch c) {
+  buf_.push_back(c);
+}
+
+void UTF8StringStreamBuffer::Flush() {
+  out_->write(reinterpret_cast<char*>(buf_.data()), buf_.size());
+  buf_.clear();
+}
+
+//
+// JsonWriterImpl: simply forward to the underlying implementation.
+//
+
+template<class T>
+JsonWriterImpl<T>::JsonWriterImpl(ostringstream* out)
+  : stream_(DCHECK_NOTNULL(out)),
+    writer_(stream_) {
+}
+template<class T>
+void JsonWriterImpl<T>::Null() { writer_.Null(); }
+template<class T>
+void JsonWriterImpl<T>::Bool(bool b) { writer_.Bool(b); }
+template<class T>
+void JsonWriterImpl<T>::Int(int i) { writer_.Int(i); }
+template<class T>
+void JsonWriterImpl<T>::Uint(unsigned u) { writer_.Uint(u); }
+template<class T>
+void JsonWriterImpl<T>::Int64(int64_t i64) { writer_.Int64(i64); }
+template<class T>
+void JsonWriterImpl<T>::Uint64(uint64_t u64) { writer_.Uint64(u64); }
+template<class T>
+void JsonWriterImpl<T>::Double(double d) { writer_.Double(d); }
+template<class T>
+void JsonWriterImpl<T>::String(const char* str, size_t length) { writer_.String(str, length); }
+template<class T>
+void JsonWriterImpl<T>::String(const char* str) { writer_.String(str); }
+template<class T>
+void JsonWriterImpl<T>::String(const string& str) { writer_.String(str.c_str(), str.length()); }
+template<class T>
+void JsonWriterImpl<T>::StartObject() { writer_.StartObject(); }
+template<class T>
+void JsonWriterImpl<T>::EndObject() {
+  writer_.EndObject();
+  stream_.Flush();
+}
+template<class T>
+void JsonWriterImpl<T>::StartArray() { writer_.StartArray(); }
+template<class T>
+void JsonWriterImpl<T>::EndArray() {
+  writer_.EndArray();
+  stream_.Flush();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonwriter.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonwriter.h b/be/src/kudu/util/jsonwriter.h
new file mode 100644
index 0000000..24b4575
--- /dev/null
+++ b/be/src/kudu/util/jsonwriter.h
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_JSONWRITER_H
+#define KUDU_UTIL_JSONWRITER_H
+
+#include <cstddef>
+#include <cstdint>
+#include <iosfwd>
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+
+namespace google {
+namespace protobuf {
+class FieldDescriptor;
+class Message;
+class Reflection;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class JsonWriterIf;
+
+// Acts as a pimpl for rapidjson so that not all metrics users must bring in the
+// rapidjson library, which is template-based and therefore hard to forward-declare.
+//
+// This class implements all the methods of rapidjson::JsonWriter, plus an
+// additional convenience method for String(std::string).
+//
+// We take an instance of std::stringstream in the constructor because Mongoose / Squeasel
+// uses std::stringstream for output buffering.
+class JsonWriter {
+ public:
+  enum Mode {
+    // Pretty-print the JSON, with nice indentation, newlines, etc.
+    PRETTY,
+    // Print the JSON as compactly as possible.
+    COMPACT
+  };
+
+  JsonWriter(std::ostringstream* out, Mode mode);
+  ~JsonWriter();
+
+  void Null();
+  void Bool(bool b);
+  void Int(int i);
+  void Uint(unsigned u);
+  void Int64(int64_t i64);
+  void Uint64(uint64_t u64);
+  void Double(double d);
+  void String(const char* str, size_t length);
+  void String(const char* str);
+  void String(const std::string& str);
+
+  // Convert the given protobuf message to JSON.
+  // The output respects redaction for 'string' and 'bytes' fields.
+  void Protobuf(const google::protobuf::Message& message);
+
+  template<typename T>
+  void Value(const T& val);
+
+  void StartObject();
+  void EndObject();
+  void StartArray();
+  void EndArray();
+
+  // Convert the given protobuf to JSON format.
+  static std::string ToJson(const google::protobuf::Message& pb,
+                            Mode mode);
+
+ private:
+  void ProtobufField(const google::protobuf::Message& pb,
+                     const google::protobuf::Reflection* reflection,
+                     const google::protobuf::FieldDescriptor* field);
+  void ProtobufRepeatedField(const google::protobuf::Message& pb,
+                             const google::protobuf::Reflection* reflection,
+                             const google::protobuf::FieldDescriptor* field,
+                             int index);
+
+  std::unique_ptr<JsonWriterIf> impl_;
+  DISALLOW_COPY_AND_ASSIGN(JsonWriter);
+};
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_JSONWRITER_H

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/jsonwriter_test.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/jsonwriter_test.proto b/be/src/kudu/util/jsonwriter_test.proto
new file mode 100644
index 0000000..b6f0300
--- /dev/null
+++ b/be/src/kudu/util/jsonwriter_test.proto
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package jsonwriter_test;
+
+import "kudu/util/pb_util.proto";
+
+// This proto includes every type of field in both singular and repeated
+// forms. This is mostly copied from 'unittest.proto' in the protobuf source
+// (hence the odd field numbers which skip some).
+message TestAllTypes {
+  message NestedMessage {
+    optional int32 int_field = 1;
+  }
+
+  enum NestedEnum {
+    FOO = 1;
+    BAR = 2;
+    BAZ = 3;
+  }
+
+  // Singular
+  optional    int32 optional_int32    =  1;
+  optional    int64 optional_int64    =  2;
+  optional   uint32 optional_uint32   =  3;
+  optional   uint64 optional_uint64   =  4;
+  optional   sint32 optional_sint32   =  5;
+  optional   sint64 optional_sint64   =  6;
+  optional  fixed32 optional_fixed32  =  7;
+  optional  fixed64 optional_fixed64  =  8;
+  optional sfixed32 optional_sfixed32 =  9;
+  optional sfixed64 optional_sfixed64 = 10;
+  optional    float optional_float    = 11;
+  optional   double optional_double   = 12;
+  optional     bool optional_bool     = 13;
+  optional   string optional_string   = 14;
+  optional   string optional_redacted_string = 15 [ (kudu.REDACT) = true ];
+  optional    bytes optional_bytes    = 16;
+  optional    bytes optional_redacted_bytes = 17 [ (kudu.REDACT) = true ];
+
+  optional NestedMessage optional_nested_message  = 18;
+  optional NestedEnum optional_nested_enum     = 21;
+
+  // Repeated
+  repeated    int32 repeated_int32    = 31;
+  repeated    int64 repeated_int64    = 32;
+  repeated   uint32 repeated_uint32   = 33;
+  repeated   uint64 repeated_uint64   = 34;
+  repeated   sint32 repeated_sint32   = 35;
+  repeated   sint64 repeated_sint64   = 36;
+  repeated  fixed32 repeated_fixed32  = 37;
+  repeated  fixed64 repeated_fixed64  = 38;
+  repeated sfixed32 repeated_sfixed32 = 39;
+  repeated sfixed64 repeated_sfixed64 = 40;
+  repeated    float repeated_float    = 41;
+  repeated   double repeated_double   = 42;
+  repeated     bool repeated_bool     = 43;
+  repeated   string repeated_string   = 44;
+  repeated    bytes repeated_bytes    = 45;
+  repeated   string repeated_redacted_string = 46 [ (kudu.REDACT) = true ];
+  repeated   string repeated_redacted_bytes = 47 [ (kudu.REDACT) = true ];
+
+  repeated NestedMessage repeated_nested_message = 48;
+  repeated NestedEnum repeated_nested_enum = 51;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/kernel_stack_watchdog.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/kernel_stack_watchdog.cc b/be/src/kudu/util/kernel_stack_watchdog.cc
new file mode 100644
index 0000000..27a259c
--- /dev/null
+++ b/be/src/kudu/util/kernel_stack_watchdog.cc
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/kernel_stack_watchdog.h"
+
+#include <cstdint>
+#include <cstring>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/env.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/os-util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+DEFINE_int32(hung_task_check_interval_ms, 200,
+             "Number of milliseconds in between checks for hung threads");
+TAG_FLAG(hung_task_check_interval_ms, hidden);
+
+DEFINE_int32(inject_latency_on_kernel_stack_lookup_ms, 0,
+             "Number of milliseconds of latency to inject when reading a thread's "
+             "kernel stack");
+TAG_FLAG(inject_latency_on_kernel_stack_lookup_ms, hidden);
+
+using std::lock_guard;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+__thread KernelStackWatchdog::TLS* KernelStackWatchdog::tls_;
+
+KernelStackWatchdog::KernelStackWatchdog()
+  : log_collector_(nullptr),
+    finish_(1) {
+
+  // During creation of the stack watchdog thread, we need to disable using
+  // the stack watchdog itself. Otherwise, the 'StartThread' function will
+  // try to call back into initializing the stack watchdog, and will self-deadlock.
+  CHECK_OK(Thread::CreateWithFlags(
+      "kernel-watchdog", "kernel-watcher",
+      boost::bind(&KernelStackWatchdog::RunThread, this),
+      Thread::NO_STACK_WATCHDOG,
+      &thread_));
+}
+
+KernelStackWatchdog::~KernelStackWatchdog() {
+  finish_.CountDown();
+  CHECK_OK(ThreadJoiner(thread_.get()).Join());
+}
+
+void KernelStackWatchdog::SaveLogsForTests(bool save_logs) {
+  lock_guard<simple_spinlock> l(log_lock_);
+  if (save_logs) {
+    log_collector_.reset(new std::vector<string>());
+  } else {
+    log_collector_.reset();
+  }
+}
+
+std::vector<string> KernelStackWatchdog::LoggedMessagesForTests() const {
+  lock_guard<simple_spinlock> l(log_lock_);
+  CHECK(log_collector_) << "Must call SaveLogsForTests(true) first";
+  return *log_collector_;
+}
+
+void KernelStackWatchdog::Register(TLS* tls) {
+  int64_t tid = Thread::CurrentThreadId();
+  lock_guard<simple_spinlock> l(tls_lock_);
+  InsertOrDie(&tls_by_tid_, tid, tls);
+}
+
+void KernelStackWatchdog::Unregister() {
+  int64_t tid = Thread::CurrentThreadId();
+
+  std::unique_ptr<TLS> tls(tls_);
+  {
+    std::unique_lock<Mutex> l(unregister_lock_, std::try_to_lock);
+    lock_guard<simple_spinlock> l2(tls_lock_);
+    CHECK(tls_by_tid_.erase(tid));
+    if (!l.owns_lock()) {
+      // The watchdog is in the middle of running and might be accessing
+      // 'tls', so just enqueue it for later deletion. Otherwise it
+      // will go out of scope at the end of this function and get
+      // deleted here.
+      pending_delete_.emplace_back(std::move(tls));
+    }
+  }
+  tls_ = nullptr;
+}
+
+Status GetKernelStack(pid_t p, string* ret) {
+  MAYBE_INJECT_FIXED_LATENCY(FLAGS_inject_latency_on_kernel_stack_lookup_ms);
+  faststring buf;
+  RETURN_NOT_OK(ReadFileToString(Env::Default(), Substitute("/proc/$0/stack", p), &buf));
+  *ret = buf.ToString();
+  return Status::OK();
+}
+
+void KernelStackWatchdog::RunThread() {
+  while (true) {
+    MonoDelta delta = MonoDelta::FromMilliseconds(FLAGS_hung_task_check_interval_ms);
+    if (finish_.WaitFor(delta)) {
+      // Watchdog exiting.
+      break;
+    }
+
+    // Don't send signals while the debugger is running, since it makes it hard to
+    // use.
+    if (IsBeingDebugged()) {
+      continue;
+    }
+
+    // Prevent threads from deleting their TLS objects between the snapshot loop and the sending of
+    // signals. This makes it safe for us to access their TLS.
+    //
+    // NOTE: it's still possible that the thread will have exited in between grabbing its pointer
+    // and sending a signal, but DumpThreadStack() already is safe about not sending a signal
+    // to some other non-Kudu thread.
+    MutexLock l(unregister_lock_);
+
+    // Take the snapshot of the thread information under a short lock.
+    //
+    // 'tls_lock_' prevents new threads from starting, so we don't want to do any lengthy work
+    // (such as gathering stack traces) under this lock.
+    TLSMap tls_map_copy;
+    vector<unique_ptr<TLS>> to_delete;
+    {
+      lock_guard<simple_spinlock> l(tls_lock_);
+      to_delete.swap(pending_delete_);
+      tls_map_copy = tls_by_tid_;
+    }
+    // Actually delete the no-longer-used TLS entries outside of the lock.
+    to_delete.clear();
+
+    MicrosecondsInt64 now = GetMonoTimeMicros();
+    for (const auto& entry : tls_map_copy) {
+      pid_t p = entry.first;
+      TLS::Data* tls = &entry.second->data_;
+      TLS::Data tls_copy;
+      tls->SnapshotCopy(&tls_copy);
+      for (int i = 0; i < tls_copy.depth_; i++) {
+        const TLS::Frame* frame = &tls_copy.frames_[i];
+
+        int paused_ms = (now - frame->start_time_) / 1000;
+        if (paused_ms > frame->threshold_ms_) {
+          string kernel_stack;
+          Status s = GetKernelStack(p, &kernel_stack);
+          if (!s.ok()) {
+            // Can't read the kernel stack of the pid, just ignore it.
+            kernel_stack = "(could not read kernel stack)";
+          }
+
+          string user_stack = DumpThreadStack(p);
+
+          // If the thread exited the frame we're looking at in between when we started
+          // grabbing the stack and now, then our stack isn't correct. We shouldn't log it.
+          //
+          // We just use unprotected reads here since this is a somewhat best-effort
+          // check.
+          if (ANNOTATE_UNPROTECTED_READ(tls->depth_) < tls_copy.depth_ ||
+              ANNOTATE_UNPROTECTED_READ(tls->frames_[i].start_time_) != frame->start_time_) {
+            break;
+          }
+
+          lock_guard<simple_spinlock> l(log_lock_);
+          LOG_STRING(WARNING, log_collector_.get())
+              << "Thread " << p << " stuck at " << frame->status_
+              << " for " << paused_ms << "ms" << ":\n"
+              << "Kernel stack:\n" << kernel_stack << "\n"
+              << "User stack:\n" << user_stack;
+        }
+      }
+    }
+  }
+}
+
+void KernelStackWatchdog::ThreadExiting(void* /* unused */) {
+  KernelStackWatchdog::GetInstance()->Unregister();
+}
+
+void KernelStackWatchdog::CreateAndRegisterTLS() {
+  DCHECK(!tls_);
+  // Disable leak check. LSAN sometimes gets false positives on thread locals.
+  // See: https://github.com/google/sanitizers/issues/757
+  debug::ScopedLeakCheckDisabler d;
+  auto* tls = new TLS();
+  KernelStackWatchdog::GetInstance()->Register(tls);
+  tls_ = tls;
+  kudu::threadlocal::internal::AddDestructor(&ThreadExiting, nullptr);
+}
+
+KernelStackWatchdog::TLS::TLS() {
+  memset(&data_, 0, sizeof(data_));
+}
+
+KernelStackWatchdog::TLS::~TLS() {
+}
+
+// Optimistic concurrency control approach to snapshot the value of another
+// thread's TLS, even though that thread might be changing it.
+//
+// Called by the watchdog thread to see if a target thread is currently in the
+// middle of a watched section.
+void KernelStackWatchdog::TLS::Data::SnapshotCopy(Data* copy) const {
+  while (true) {
+    Atomic32 v_0 = base::subtle::Acquire_Load(&seq_lock_);
+    if (v_0 & 1) {
+      // If the value is odd, then the thread is in the middle of modifying
+      // its TLS, and we have to spin.
+      base::subtle::PauseCPU();
+      continue;
+    }
+    ANNOTATE_IGNORE_READS_BEGIN();
+    memcpy(copy, this, sizeof(*copy));
+    ANNOTATE_IGNORE_READS_END();
+    Atomic32 v_1 = base::subtle::Release_Load(&seq_lock_);
+
+    // If the value hasn't changed since we started the copy, then
+    // we know that the copy was a consistent snapshot.
+    if (v_1 == v_0) break;
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/kernel_stack_watchdog.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/kernel_stack_watchdog.h b/be/src/kudu/util/kernel_stack_watchdog.h
new file mode 100644
index 0000000..6ec7b50
--- /dev/null
+++ b/be/src/kudu/util/kernel_stack_watchdog.h
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This class defines a singleton thread which manages a map of other thread IDs to
+// watch. Before performing some operation which may stall (eg IO) or which we expect
+// should be short (e.g. a callback on a critical thread that should not block), threads
+// may mark themselves as "watched", with a threshold beyond which they would like
+// warnings to be emitted including their stack trace at that time.
+//
+// In the background, a separate watchdog thread periodically wakes up, and if a thread
+// has been marked longer than its provided threshold, it will dump the stack trace
+// of that thread (both kernel-mode and user-mode stacks).
+//
+// This can be useful for diagnosing I/O stalls coming from the kernel, for example.
+//
+// Users will typically use the macro SCOPED_WATCH_STACK. Example usage:
+//
+//   // We expect the Write() to return in <100ms. If it takes longer than that
+//   // we'll see warnings indicating why it is stalled.
+//   {
+//     SCOPED_WATCH_STACK(100);
+//     file->Write(...);
+//   }
+//
+// If the Write call takes too long, a stack trace will be logged at WARNING level.
+// Note that the threshold time parameter is not a guarantee that a stall will be
+// caught by the watchdog thread. The watchdog only wakes up periodically to look
+// for threads that have been stalled too long. For example, if the threshold is 10ms
+// and the thread blocks for only 20ms, it's quite likely that the watchdog will
+// have missed the event.
+//
+// The SCOPED_WATCH_STACK macro is designed to have minimal overhead: approximately
+// equivalent to a clock_gettime() and a single 'mfence' instruction. Micro-benchmarks
+// measure the cost at about 50ns per call. Thus, it may safely be used in hot code
+// paths.
+//
+// Scopes with SCOPED_WATCH_STACK may be nested, but only up to a hard-coded limited depth
+// (currently 8).
+#ifndef KUDU_UTIL_KERNEL_STACK_WATCHDOG_H
+#define KUDU_UTIL_KERNEL_STACK_WATCHDOG_H
+
+#include <ctime>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/singleton.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/threadlocal.h"
+
+#define SCOPED_WATCH_STACK(threshold_ms) \
+  ScopedWatchKernelStack _stack_watcher(__FILE__ ":" AS_STRING(__LINE__), threshold_ms)
+
+namespace kudu {
+
+class Thread;
+
+// Singleton thread which implements the watchdog.
+class KernelStackWatchdog {
+ public:
+  static KernelStackWatchdog* GetInstance() {
+    return Singleton<KernelStackWatchdog>::get();
+  }
+
+  // Instead of logging through glog, log warning messages into a vector.
+  //
+  // If 'save_logs' is true, will start saving to the vector, and forget any
+  // previously logged messages.
+  // If 'save_logs' is false, disables this functionality.
+  void SaveLogsForTests(bool save_logs);
+
+  // Return any log messages saved since the last call to SaveLogsForTests(true).
+  std::vector<std::string> LoggedMessagesForTests() const;
+
+ private:
+  friend class Singleton<KernelStackWatchdog>;
+  friend class ScopedWatchKernelStack;
+
+  // The thread-local state which captures whether a thread should be watched by
+  // the watchdog. This structure is constructed as a thread-local on first use
+  // and destructed when the thread exits. Upon construction, the TLS structure
+  // registers itself with the WatchDog, and on destruction, unregisters itself.
+  //
+  // See 'seq_lock_' below for details on thread-safe operation.
+  struct TLS {
+    TLS();
+    ~TLS();
+
+    enum Constants {
+      // The maximum nesting depth of SCOPED_WATCH_STACK() macros.
+      kMaxDepth = 8
+    };
+
+    // Because we support nested SCOPED_WATCH_STACK() macros, we need to capture
+    // multiple active frames within the TLS.
+    struct Frame {
+      // The time at which this frame entered the SCOPED_WATCH_STACK section.
+      // We use MicrosecondsInt64 instead of MonoTime because it inlines a bit
+      // better.
+      MicrosecondsInt64 start_time_;
+      // The threshold of time beyond which the watchdog should emit warnings.
+      int threshold_ms_;
+      // A string explaining the state that the thread is in (typically a file:line
+      // string). This is expected to be static storage and is not freed.
+      const char* status_;
+    };
+
+    // The data within the TLS. This is a POD type so that the watchdog can easily
+    // copy data out of a thread's TLS.
+    struct Data {
+      Frame frames_[kMaxDepth];
+      Atomic32 depth_;
+
+      // Counter implementing a simple "sequence lock".
+      //
+      // Before modifying any data inside its TLS, the watched thread increments this value so it is
+      // odd. When the modifications are complete, it increments it again, making it even.
+      //
+      // To read the TLS data from a target thread, the watchdog thread waits for the value
+      // to become even, indicating that no write is in progress. Then, it does a potentially
+      // racy copy of the entire 'Data' structure. Then, it validates the value again.
+      // If it is has not changed, then the snapshot is guaranteed to be consistent.
+      //
+      // We use this type of locking to ensure that the watched thread is as fast as possible,
+      // allowing us to use SCOPED_WATCH_STACK even in hot code paths. In particular,
+      // the watched thread is wait-free, since it doesn't need to loop or retry. In addition, the
+      // memory is only written by that thread, eliminating any cache-line bouncing. The watchdog
+      // thread may have to loop multiple times to see a consistent snapshot, but we're OK delaying
+      // the watchdog arbitrarily since it isn't on any critical path.
+      Atomic32 seq_lock_;
+
+      // Take a consistent snapshot of this data into 'dst'. This may block if the target thread
+      // is currently modifying its TLS.
+      void SnapshotCopy(Data* dst) const;
+    };
+    Data data_;
+  };
+
+  KernelStackWatchdog();
+  ~KernelStackWatchdog();
+
+  // Get or create the TLS for the current thread.
+  static TLS* GetTLS() {
+    if (PREDICT_FALSE(!tls_)) {
+      CreateAndRegisterTLS();
+    }
+    return tls_;
+  }
+
+  // Create a new TLS for the current thread, and register it with the watchdog.
+  // Installs a callback to automatically unregister the thread upon its exit.
+  static void CreateAndRegisterTLS();
+
+  // Callback which is registered to run at thread-exit time by CreateAndRegisterTLS().
+  static void ThreadExiting(void* tls_void);
+
+  // Register a new thread's TLS with the watchdog.
+  // Called by any thread the first time it enters a watched section, when its TLS
+  // is constructed.
+  void Register(TLS* tls);
+
+  // Called when a thread is in the process of exiting, and has a registered TLS
+  // object.
+  void Unregister();
+
+  // The actual watchdog loop that the watchdog thread runs.
+  void RunThread();
+
+  DECLARE_STATIC_THREAD_LOCAL(TLS, tls_);
+
+  typedef std::unordered_map<pid_t, TLS*> TLSMap;
+  TLSMap tls_by_tid_;
+
+  // If a thread exits while the watchdog is in the middle of accessing the TLS
+  // objects, we can't immediately delete the TLS struct. Instead, the thread
+  // enqueues it here for later deletion by the watchdog thread within RunThread().
+  std::vector<std::unique_ptr<TLS>> pending_delete_;
+
+  // If non-NULL, warnings will be emitted into this vector instead of glog.
+  // Used by tests.
+  gscoped_ptr<std::vector<std::string> > log_collector_;
+
+  // Lock protecting log_collector_.
+  mutable simple_spinlock log_lock_;
+
+  // Lock protecting tls_by_tid_ and pending_delete_.
+  mutable simple_spinlock tls_lock_;
+
+  // Lock which prevents threads from unregistering while the watchdog
+  // sends signals.
+  //
+  // This is used to prevent the watchdog from sending a signal to a pid just
+  // after the pid has actually exited and been reused. Sending a signal to
+  // a non-Kudu thread could have unintended consequences.
+  //
+  // When this lock is held concurrently with 'tls_lock_' or 'log_lock_',
+  // this lock must be acquired first.
+  Mutex unregister_lock_;
+
+  // The watchdog thread itself.
+  scoped_refptr<Thread> thread_;
+
+  // Signal to stop the watchdog.
+  CountDownLatch finish_;
+
+  DISALLOW_COPY_AND_ASSIGN(KernelStackWatchdog);
+};
+
+// Scoped object which marks the current thread for watching.
+class ScopedWatchKernelStack {
+ public:
+  // If the current scope is active more than 'threshold_ms' milliseconds, the
+  // watchdog thread will log a warning including the message 'label'. 'label'
+  // is not copied or freed.
+  ScopedWatchKernelStack(const char* label, int threshold_ms) {
+    if (threshold_ms <= 0) return;
+
+    // Rather than just using the lazy GetTLS() method, we'll first try to load
+    // the TLS ourselves. This is usually successful, and avoids us having to inline
+    // the TLS construction path at call sites.
+    KernelStackWatchdog::TLS* tls = KernelStackWatchdog::tls_;
+    if (PREDICT_FALSE(tls == NULL)) {
+      tls = KernelStackWatchdog::GetTLS();
+    }
+    KernelStackWatchdog::TLS::Data* tls_data = &tls->data_;
+
+    // "Acquire" the sequence lock. While the lock value is odd, readers will block.
+    // TODO: technically this barrier is stronger than we need: we are the only writer
+    // to this data, so it's OK to allow loads from within the critical section to
+    // reorder above this next line. All we need is a "StoreStore" barrier (i.e.
+    // prevent any stores in the critical section from getting reordered above the
+    // increment of the counter). However, atomicops.h doesn't provide such a barrier
+    // as of yet, so we'll do the slightly more expensive one for now.
+    base::subtle::Acquire_Store(&tls_data->seq_lock_, tls_data->seq_lock_ + 1);
+
+    KernelStackWatchdog::TLS::Frame* frame = &tls_data->frames_[tls_data->depth_++];
+    DCHECK_LE(tls_data->depth_, KernelStackWatchdog::TLS::kMaxDepth);
+    frame->start_time_ = GetMonoTimeMicros();
+    frame->threshold_ms_ = threshold_ms;
+    frame->status_ = label;
+
+    // "Release" the sequence lock. This resets the lock value to be even, so readers
+    // will proceed.
+    base::subtle::Release_Store(&tls_data->seq_lock_, tls_data->seq_lock_ + 1);
+  }
+
+  ~ScopedWatchKernelStack() {
+    if (!KernelStackWatchdog::tls_) return;
+
+    KernelStackWatchdog::TLS::Data* tls = &KernelStackWatchdog::tls_->data_;
+    int d = tls->depth_;
+    DCHECK_GT(d, 0);
+
+    // We don't bother with a lock/unlock, because the change we're making here is atomic.
+    // If we race with the watchdog, either they'll see the old depth_ or the new depth_,
+    // but in either case the underlying data is perfectly valid.
+    base::subtle::NoBarrier_Store(&tls->depth_, d - 1);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ScopedWatchKernelStack);
+};
+
+} // namespace kudu
+#endif /* KUDU_UTIL_KERNEL_STACK_WATCHDOG_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/knapsack_solver-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/knapsack_solver-test.cc b/be/src/kudu/util/knapsack_solver-test.cc
new file mode 100644
index 0000000..9f717c4
--- /dev/null
+++ b/be/src/kudu/util/knapsack_solver-test.cc
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/knapsack_solver.h"
+#include "kudu/util/stopwatch.h"  // IWYU pragma: keep
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class TestKnapsack : public KuduTest {
+};
+
+// A simple test item for use with the knapsack solver.
+// The real code will be solving knapsack over RowSet objects --
+// using simple value/weight pairs in the tests makes it standalone.
+struct TestItem {
+  TestItem(double v, int w)
+    : value(v), weight(w) {
+  }
+
+  double value;
+  int weight;
+};
+
+// A traits class to adapt the knapsack solver to TestItem.
+struct TestItemTraits {
+  typedef TestItem item_type;
+  typedef double value_type;
+  static int get_weight(const TestItem &item) {
+    return item.weight;
+  }
+  static value_type get_value(const TestItem &item) {
+    return item.value;
+  }
+};
+
+// Generate random items into the provided vector.
+static void GenerateRandomItems(int n_items, int max_weight,
+                                vector<TestItem> *out) {
+  for (int i = 0; i < n_items; i++) {
+    double value = 10000.0 / (random() % 10000 + 1);
+    int weight = random() % max_weight;
+    out->push_back(TestItem(value, weight));
+  }
+}
+
+// Join and stringify the given list of ints.
+static string JoinInts(const vector<int> &ints) {
+  string ret;
+  for (int i = 0; i < ints.size(); i++) {
+    if (i > 0) {
+      ret.push_back(',');
+    }
+    ret.append(std::to_string(ints[i]));
+  }
+  return ret;
+}
+
+TEST_F(TestKnapsack, Basics) {
+  KnapsackSolver<TestItemTraits> solver;
+
+  vector<TestItem> in;
+  in.emplace_back(500, 3);
+  in.emplace_back(110, 1);
+  in.emplace_back(125, 1);
+  in.emplace_back(100, 1);
+
+  vector<int> out;
+  double max_val;
+
+  // For 1 weight, pick item 2
+  solver.Solve(in, 1, &out, &max_val);
+  ASSERT_DOUBLE_EQ(125, max_val);
+  ASSERT_EQ("2", JoinInts(out));
+  out.clear();
+
+  // For 2 weight, pick item 1, 2
+  solver.Solve(in, 2, &out, &max_val);
+  ASSERT_DOUBLE_EQ(110 + 125, max_val);
+  ASSERT_EQ("2,1", JoinInts(out));
+  out.clear();
+
+  // For 3 weight, pick item 0
+  solver.Solve(in, 3, &out, &max_val);
+  ASSERT_DOUBLE_EQ(500, max_val);
+  ASSERT_EQ("0", JoinInts(out));
+  out.clear();
+
+  // For 10 weight, pick all.
+  solver.Solve(in, 10, &out, &max_val);
+  ASSERT_DOUBLE_EQ(500 + 110 + 125 + 100, max_val);
+  ASSERT_EQ("3,2,1,0", JoinInts(out));
+  out.clear();
+}
+
+// Test which generates random knapsack instances and verifies
+// that the result satisfies the constraints.
+TEST_F(TestKnapsack, Randomized) {
+  SeedRandom();
+  KnapsackSolver<TestItemTraits> solver;
+
+  const int kNumTrials = AllowSlowTests() ? 200 : 1;
+  const int kMaxWeight = 1000;
+  const int kNumItems = 1000;
+
+  for (int i = 0; i < kNumTrials; i++) {
+    vector<TestItem> in;
+    vector<int> out;
+    GenerateRandomItems(kNumItems, kMaxWeight, &in);
+    double max_val;
+    int max_weight = random() % kMaxWeight;
+    solver.Solve(in, max_weight, &out, &max_val);
+
+    // Verify that the max_val is equal to the sum of the chosen items' values.
+    double sum_val = 0;
+    int sum_weight = 0;
+    for (int i : out) {
+      sum_val += in[i].value;
+      sum_weight += in[i].weight;
+    }
+    ASSERT_NEAR(max_val, sum_val, 0.000001);
+    ASSERT_LE(sum_weight, max_weight);
+  }
+}
+
+#ifdef NDEBUG
+TEST_F(TestKnapsack, Benchmark) {
+  KnapsackSolver<TestItemTraits> solver;
+
+  const int kNumTrials = 1000;
+  const int kMaxWeight = 1000;
+  const int kNumItems = 1000;
+
+  vector<TestItem> in;
+  GenerateRandomItems(kNumItems, kMaxWeight, &in);
+
+  LOG_TIMING(INFO, "benchmark") {
+    vector<int> out;
+    for (int i = 0; i < kNumTrials; i++) {
+      out.clear();
+      double max_val;
+      solver.Solve(in, random() % kMaxWeight, &out, &max_val);
+    }
+  }
+}
+#endif
+
+} // namespace kudu