You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:22 UTC

[10/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/os-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/os-util.h b/be/src/kudu/util/os-util.h
new file mode 100644
index 0000000..7e1bbb6
--- /dev/null
+++ b/be/src/kudu/util/os-util.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Imported from Impala. Changes include:
+// - Namespace + imports.
+// - Fixes for cpplint.
+// - Fixed parsing when thread names have spaces.
+
+#ifndef KUDU_UTIL_OS_UTIL_H
+#define KUDU_UTIL_OS_UTIL_H
+
+#include <cstdint>
+#include <string>
+#include <type_traits> // IWYU pragma: keep
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Utility methods to read interesting values from /proc.
+// TODO: Get stats for parent process.
+
+// Container struct for statistics read from the /proc filesystem for a thread.
+struct ThreadStats {
+  int64_t user_ns;
+  int64_t kernel_ns;
+  int64_t iowait_ns;
+
+  // Default constructor zeroes all members in case structure can't be filled by
+  // GetThreadStats.
+  ThreadStats() : user_ns(0), kernel_ns(0), iowait_ns(0) { }
+};
+
+// Populates ThreadStats object using a given buffer. The buffer is expected to
+// conform to /proc/<pid>/task/<tid>/stat layout; an error will be returned otherwise.
+//
+// If 'name' is supplied, the extracted thread name will be written to it.
+Status ParseStat(const std::string&buffer, std::string* name, ThreadStats* stats);
+
+// Populates ThreadStats object for a given thread by reading from
+// /proc/<pid>/task/<tid>/stat. Returns OK unless the file cannot be read or is in an
+// unrecognised format, or if the kernel version is not modern enough.
+Status GetThreadStats(int64_t tid, ThreadStats* stats);
+
+// Disable core dumps for this process.
+//
+// This is useful particularly in tests where we have injected failures and don't
+// want to generate a core dump from an "expected" crash.
+void DisableCoreDumps();
+
+// Return true if this process appears to be running under a debugger or strace.
+//
+// This may return false on unsupported (non-Linux) platforms.
+bool IsBeingDebugged();
+
+} // namespace kudu
+
+#endif /* KUDU_UTIL_OS_UTIL_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/path_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/path_util-test.cc b/be/src/kudu/util/path_util-test.cc
new file mode 100644
index 0000000..0d617fc
--- /dev/null
+++ b/be/src/kudu/util/path_util-test.cc
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/path_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+TEST(TestPathUtil, BaseNameTest) {
+  ASSERT_EQ(".", BaseName(""));
+  ASSERT_EQ(".", BaseName("."));
+  ASSERT_EQ("..", BaseName(".."));
+  ASSERT_EQ("/", BaseName("/"));
+  ASSERT_EQ("/", BaseName("//"));
+  ASSERT_EQ("a", BaseName("a"));
+  ASSERT_EQ("ab", BaseName("ab"));
+  ASSERT_EQ("ab", BaseName("ab/"));
+  ASSERT_EQ("cd", BaseName("ab/cd"));
+  ASSERT_EQ("ab", BaseName("/ab"));
+  ASSERT_EQ("ab", BaseName("/ab///"));
+  ASSERT_EQ("cd", BaseName("/ab/cd"));
+}
+
+TEST(TestPathUtil, DirNameTest) {
+  ASSERT_EQ(".", DirName(""));
+  ASSERT_EQ(".", DirName("."));
+  ASSERT_EQ(".", DirName(".."));
+  ASSERT_EQ("/", DirName("/"));
+#if defined(__linux__)
+  // On OS X this test case returns "/", while Linux returns "//". On both
+  // platforms dirname(1) returns "/". The difference is unlikely to matter in
+  // practice.
+  ASSERT_EQ("//", DirName("//"));
+#else
+  ASSERT_EQ("/", DirName("//"));
+#endif // defined(__linux__)
+  ASSERT_EQ(".", DirName("a"));
+  ASSERT_EQ(".", DirName("ab"));
+  ASSERT_EQ(".", DirName("ab/"));
+  ASSERT_EQ("ab", DirName("ab/cd"));
+  ASSERT_EQ("/", DirName("/ab"));
+  ASSERT_EQ("/", DirName("/ab///"));
+  ASSERT_EQ("/ab", DirName("/ab/cd"));
+}
+
+TEST(TestPathUtil, SplitPathTest) {
+  typedef vector<string> Vec;
+  ASSERT_EQ(Vec({"/"}), SplitPath("/"));
+  ASSERT_EQ(Vec({"/", "a", "b"}), SplitPath("/a/b"));
+  ASSERT_EQ(Vec({"/", "a", "b"}), SplitPath("/a/b/"));
+  ASSERT_EQ(Vec({"a", "b"}), SplitPath("a/b"));
+  ASSERT_EQ(Vec({"."}), SplitPath("."));
+  ASSERT_EQ(Vec(), SplitPath(""));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/path_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/path_util.cc b/be/src/kudu/util/path_util.cc
new file mode 100644
index 0000000..6d1c4a5
--- /dev/null
+++ b/be/src/kudu/util/path_util.cc
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/path_util.h"
+
+// Use the POSIX version of dirname(3).
+#include <libgen.h>
+
+#include <cstring>
+#if defined(__APPLE__)
+#include <mutex>
+#endif // defined(__APPLE__)
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/util/env.h"
+#include "kudu/util/status.h"
+#include "kudu/util/subprocess.h"
+
+
+using std::string;
+using std::vector;
+using strings::SkipEmpty;
+using strings::Split;
+
+namespace kudu {
+
+const char kTmpInfix[] = ".kudutmp";
+const char kOldTmpInfix[] = ".tmp";
+
+string JoinPathSegments(const string& a, const string& b) {
+  CHECK(!a.empty()) << "empty first component: " << a;
+  CHECK(!b.empty() && b[0] != '/')
+    << "second path component must be non-empty and relative: "
+    << b;
+  if (a[a.size() - 1] == '/') {
+    return a + b;
+  } else {
+    return a + "/" + b;
+  }
+}
+
+vector<string> JoinPathSegmentsV(const vector<string>& v, const string& s) {
+  vector<string> out;
+  for (const string& path : v) {
+    out.emplace_back(JoinPathSegments(path, s));
+  }
+  return out;
+}
+
+vector<string> SplitPath(const string& path) {
+  if (path.empty()) return {};
+  vector<string> segments;
+  if (path[0] == '/') segments.emplace_back("/");
+  vector<StringPiece> pieces = Split(path, "/", SkipEmpty());
+  for (const StringPiece& piece : pieces) {
+    segments.emplace_back(piece.data(), piece.size());
+  }
+  return segments;
+}
+
+string DirName(const string& path) {
+  gscoped_ptr<char[], FreeDeleter> path_copy(strdup(path.c_str()));
+#if defined(__APPLE__)
+  static std::mutex lock;
+  std::lock_guard<std::mutex> l(lock);
+#endif // defined(__APPLE__)
+  return ::dirname(path_copy.get());
+}
+
+string BaseName(const string& path) {
+  gscoped_ptr<char[], FreeDeleter> path_copy(strdup(path.c_str()));
+  return basename(path_copy.get());
+}
+
+Status FindExecutable(const string& binary,
+                      const vector<string>& search,
+                      string* path) {
+  string p;
+
+  // First, check specified locations. This is necessary to check first so that
+  // the system binaries won't be found before the specified search locations.
+  for (const auto& location : search) {
+    p = JoinPathSegments(location, binary);
+    if (Env::Default()->FileExists(p)) {
+      *path = p;
+      return Status::OK();
+    }
+  }
+
+  // Next check if the binary is on the PATH.
+  Status s = Subprocess::Call({ "which", binary }, "", &p);
+  if (s.ok()) {
+    StripTrailingNewline(&p);
+    *path = p;
+    return Status::OK();
+  }
+
+  return Status::NotFound("Unable to find binary", binary);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/path_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/path_util.h b/be/src/kudu/util/path_util.h
new file mode 100644
index 0000000..58211a9
--- /dev/null
+++ b/be/src/kudu/util/path_util.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility methods for dealing with file paths.
+#ifndef KUDU_UTIL_PATH_UTIL_H
+#define KUDU_UTIL_PATH_UTIL_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+class Status;
+
+// Common tmp infix
+extern const char kTmpInfix[];
+// Infix from versions of Kudu prior to 1.2.
+extern const char kOldTmpInfix[];
+
+// Join two path segments with the appropriate path separator,
+// if necessary.
+std::string JoinPathSegments(const std::string& a,
+                             const std::string& b);
+
+// Join each path segment in a list with a common suffix segment.
+std::vector<std::string> JoinPathSegmentsV(const std::vector<std::string>& v,
+                                           const std::string& s);
+
+// Split a path into segments with the appropriate path separator.
+std::vector<std::string> SplitPath(const std::string& path);
+
+// Return the enclosing directory of path.
+// This is like dirname(3) but for C++ strings.
+std::string DirName(const std::string& path);
+
+// Return the terminal component of a path.
+// This is like basename(3) but for C++ strings.
+std::string BaseName(const std::string& path);
+
+// Attempts to find the path to the executable, searching the provided locations
+// as well as the $PATH environment variable.
+Status FindExecutable(const std::string& binary,
+                      const std::vector<std::string>& search,
+                      std::string* path) WARN_UNUSED_RESULT;
+
+} // namespace kudu
+#endif /* KUDU_UTIL_PATH_UTIL_H */

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util-internal.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util-internal.cc b/be/src/kudu/util/pb_util-internal.cc
new file mode 100644
index 0000000..380072c
--- /dev/null
+++ b/be/src/kudu/util/pb_util-internal.cc
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/pb_util-internal.h"
+
+#include <ostream>
+#include <string>
+
+namespace kudu {
+namespace pb_util {
+namespace internal {
+
+////////////////////////////////////////////
+// SequentialFileFileInputStream
+////////////////////////////////////////////
+
+bool SequentialFileFileInputStream::Next(const void **data, int *size) {
+  if (PREDICT_FALSE(!status_.ok())) {
+    LOG(WARNING) << "Already failed on a previous read: " << status_.ToString();
+    return false;
+  }
+
+  size_t available = (buffer_used_ - buffer_offset_);
+  if (available > 0) {
+    *data = buffer_.get() + buffer_offset_;
+    *size = available;
+    buffer_offset_ += available;
+    total_read_ += available;
+    return true;
+  }
+
+  Slice result(buffer_.get(), buffer_size_);
+  status_ = rfile_->Read(&result);
+  if (!status_.ok()) {
+    LOG(WARNING) << "Read at " << buffer_offset_ << " failed: " << status_.ToString();
+    return false;
+  }
+
+  buffer_used_ = result.size();
+  buffer_offset_ = buffer_used_;
+  total_read_ += buffer_used_;
+  *data = buffer_.get();
+  *size = buffer_used_;
+  return buffer_used_ > 0;
+}
+
+bool SequentialFileFileInputStream::Skip(int count) {
+  CHECK_GT(count, 0);
+  int avail = (buffer_used_ - buffer_offset_);
+  if (avail > count) {
+    buffer_offset_ += count;
+    total_read_ += count;
+  } else {
+    buffer_used_ = 0;
+    buffer_offset_ = 0;
+    status_ = rfile_->Skip(count - avail);
+    total_read_ += count - avail;
+  }
+  return status_.ok();
+}
+
+////////////////////////////////////////////
+// WritableFileOutputStream
+////////////////////////////////////////////
+
+bool WritableFileOutputStream::Next(void **data, int *size) {
+  if (PREDICT_FALSE(!status_.ok())) {
+    LOG(WARNING) << "Already failed on a previous write: " << status_.ToString();
+    return false;
+  }
+
+  size_t available = (buffer_size_ - buffer_offset_);
+  if (available > 0) {
+    *data = buffer_.get() + buffer_offset_;
+    *size = available;
+    buffer_offset_ += available;
+    return true;
+  }
+
+  if (!Flush()) {
+    return false;
+  }
+
+  buffer_offset_ = buffer_size_;
+  *data = buffer_.get();
+  *size = buffer_size_;
+  return true;
+}
+
+} // namespace internal
+} // namespace pb_util
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util-internal.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util-internal.h b/be/src/kudu/util/pb_util-internal.h
new file mode 100644
index 0000000..48a501d
--- /dev/null
+++ b/be/src/kudu/util/pb_util-internal.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Classes used internally by pb_util.h.
+// This header should not be included by anything but pb_util and its tests.
+#ifndef KUDU_UTIL_PB_UTIL_INTERNAL_H
+#define KUDU_UTIL_PB_UTIL_INTERNAL_H
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+
+#include <glog/logging.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/env.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace pb_util {
+namespace internal {
+
+// Input Stream used by ParseFromSequentialFile()
+class SequentialFileFileInputStream : public google::protobuf::io::ZeroCopyInputStream {
+ public:
+  explicit SequentialFileFileInputStream(SequentialFile *rfile,
+                                         size_t buffer_size = kDefaultBufferSize)
+    : buffer_used_(0), buffer_offset_(0),
+      buffer_size_(buffer_size), buffer_(new uint8[buffer_size_]),
+      total_read_(0), rfile_(rfile) {
+    CHECK_GT(buffer_size, 0);
+  }
+
+  ~SequentialFileFileInputStream() {
+  }
+
+  bool Next(const void **data, int *size) OVERRIDE;
+  bool Skip(int count) OVERRIDE;
+
+  void BackUp(int count) OVERRIDE {
+    CHECK_GE(count, 0);
+    CHECK_LE(count, buffer_offset_);
+    buffer_offset_ -= count;
+    total_read_ -= count;
+  }
+
+  int64_t ByteCount() const OVERRIDE {
+    return total_read_;
+  }
+
+  Status status() const {
+    return status_;
+  }
+
+ private:
+  static const size_t kDefaultBufferSize = 8192;
+
+  Status status_;
+
+  size_t buffer_used_;
+  size_t buffer_offset_;
+  const size_t buffer_size_;
+  std::unique_ptr<uint8_t[]> buffer_;
+
+  size_t total_read_;
+  SequentialFile *rfile_;
+};
+
+// Output Stream used by SerializeToWritableFile()
+class WritableFileOutputStream : public google::protobuf::io::ZeroCopyOutputStream {
+ public:
+  explicit WritableFileOutputStream(WritableFile *wfile, size_t buffer_size = kDefaultBufferSize)
+    : buffer_offset_(0), buffer_size_(buffer_size), buffer_(new uint8[buffer_size_]),
+      flushed_(0), wfile_(wfile) {
+    CHECK_GT(buffer_size, 0);
+  }
+
+  ~WritableFileOutputStream() {
+  }
+
+  bool Flush() {
+    if (buffer_offset_ > 0) {
+      Slice data(buffer_.get(), buffer_offset_);
+      status_ = wfile_->Append(data);
+      flushed_ += buffer_offset_;
+      buffer_offset_ = 0;
+    }
+    return status_.ok();
+  }
+
+  bool Next(void **data, int *size) OVERRIDE;
+
+  void BackUp(int count) OVERRIDE {
+    CHECK_GE(count, 0);
+    CHECK_LE(count, buffer_offset_);
+    buffer_offset_ -= count;
+  }
+
+  int64_t ByteCount() const OVERRIDE {
+    return flushed_ + buffer_offset_;
+  }
+
+ private:
+  static const size_t kDefaultBufferSize = 8192;
+
+  Status status_;
+
+  size_t buffer_offset_;
+  const size_t buffer_size_;
+  std::unique_ptr<uint8_t[]> buffer_;
+
+  size_t flushed_;
+  WritableFile *wfile_;
+};
+
+} // namespace internal
+} // namespace pb_util
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util-test.cc b/be/src/kudu/util/pb_util-test.cc
new file mode 100644
index 0000000..a942e9a
--- /dev/null
+++ b/be/src/kudu/util/pb_util-test.cc
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/pb_util-internal.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/pb_util_test.pb.h"
+#include "kudu/util/proto_container_test.pb.h"
+#include "kudu/util/proto_container_test2.pb.h"
+#include "kudu/util/proto_container_test3.pb.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+namespace pb_util {
+
+using google::protobuf::FileDescriptorSet;
+using internal::WritableFileOutputStream;
+using std::ostringstream;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+static const char* kTestFileName = "pb_container.meta";
+static const char* kTestKeyvalName = "my-key";
+static const int kTestKeyvalValue = 1;
+static const int kUseDefaultVersion = 0; // Use the default container version (don't set it).
+
+class TestPBUtil : public KuduTest {
+ public:
+  virtual void SetUp() OVERRIDE {
+    KuduTest::SetUp();
+    path_ = GetTestPath(kTestFileName);
+  }
+
+ protected:
+  // Create a container file with expected values.
+  // Since this is a unit test class, and we want it to be fast, we do not
+  // fsync by default.
+  Status CreateKnownGoodContainerFile(CreateMode create = OVERWRITE,
+                                      SyncMode sync = NO_SYNC);
+
+  // Create a new Protobuf Container File Writer.
+  // Set version to kUseDefaultVersion to use the default version.
+  Status NewPBCWriter(int version, RWFileOptions opts,
+                      unique_ptr<WritablePBContainerFile>* pb_writer);
+
+  // Same as CreateKnownGoodContainerFile(), but with settable file version.
+  // Set version to kUseDefaultVersion to use the default version.
+  Status CreateKnownGoodContainerFileWithVersion(int version,
+                                                 CreateMode create = OVERWRITE,
+                                                 SyncMode sync = NO_SYNC);
+
+  // XORs the data in the specified range of the file at the given path.
+  Status BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t length);
+
+  void DumpPBCToString(const string& path, ReadablePBContainerFile::Format format, string* ret);
+
+  // Truncate the specified file to the specified length.
+  Status TruncateFile(const string& path, uint64_t size);
+
+  // Output file name for most unit tests.
+  string path_;
+};
+
+// Parameterized test class for running tests across various versions of PB
+// container files.
+class TestPBContainerVersions : public TestPBUtil,
+                                public ::testing::WithParamInterface<int> {
+ public:
+  TestPBContainerVersions()
+      : version_(GetParam()) {
+  }
+
+ protected:
+  const int version_; // The parameterized container version we are testing.
+};
+
+INSTANTIATE_TEST_CASE_P(SupportedVersions, TestPBContainerVersions,
+                        ::testing::Values(1, 2, kUseDefaultVersion));
+
+Status TestPBUtil::CreateKnownGoodContainerFile(CreateMode create, SyncMode sync) {
+  ProtoContainerTestPB test_pb;
+  test_pb.set_name(kTestKeyvalName);
+  test_pb.set_value(kTestKeyvalValue);
+  return WritePBContainerToPath(env_, path_, test_pb, create, sync);
+}
+
+Status TestPBUtil::NewPBCWriter(int version, RWFileOptions opts,
+                                unique_ptr<WritablePBContainerFile>* pb_writer) {
+  unique_ptr<RWFile> writer;
+  RETURN_NOT_OK(env_->NewRWFile(opts, path_, &writer));
+  pb_writer->reset(new WritablePBContainerFile(std::move(writer)));
+  if (version != kUseDefaultVersion) {
+    (*pb_writer)->SetVersionForTests(version);
+  }
+  return Status::OK();
+}
+
+Status TestPBUtil::CreateKnownGoodContainerFileWithVersion(int version,
+                                                           CreateMode create,
+                                                           SyncMode sync) {
+  ProtoContainerTestPB test_pb;
+  test_pb.set_name(kTestKeyvalName);
+  test_pb.set_value(kTestKeyvalValue);
+
+  unique_ptr<WritablePBContainerFile> pb_writer;
+  RETURN_NOT_OK(NewPBCWriter(version, RWFileOptions(), &pb_writer));
+  RETURN_NOT_OK(pb_writer->CreateNew(test_pb));
+  RETURN_NOT_OK(pb_writer->Append(test_pb));
+  RETURN_NOT_OK(pb_writer->Close());
+  return Status::OK();
+}
+
+Status TestPBUtil::BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t length) {
+  faststring buf;
+  // Read the data from disk.
+  {
+    unique_ptr<RandomAccessFile> file;
+    RETURN_NOT_OK(env_->NewRandomAccessFile(path, &file));
+    uint64_t size;
+    RETURN_NOT_OK(file->Size(&size));
+    faststring scratch;
+    scratch.resize(size);
+    Slice slice(scratch.data(), size);
+    RETURN_NOT_OK(file->Read(0, slice));
+    buf.append(slice.data(), slice.size());
+  }
+
+  // Flip the bits.
+  for (uint64_t i = 0; i < length; i++) {
+    uint8_t* addr = buf.data() + offset + i;
+    *addr = ~*addr;
+  }
+
+  // Write the data back to disk.
+  unique_ptr<WritableFile> file;
+  RETURN_NOT_OK(env_->NewWritableFile(path, &file));
+  RETURN_NOT_OK(file->Append(buf));
+  RETURN_NOT_OK(file->Close());
+
+  return Status::OK();
+}
+
+Status TestPBUtil::TruncateFile(const string& path, uint64_t size) {
+  unique_ptr<RWFile> file;
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  RETURN_NOT_OK(env_->NewRWFile(opts, path, &file));
+  RETURN_NOT_OK(file->Truncate(size));
+  return Status::OK();
+}
+
+TEST_F(TestPBUtil, TestWritableFileOutputStream) {
+  shared_ptr<WritableFile> file;
+  string path = GetTestPath("test.out");
+  ASSERT_OK(env_util::OpenFileForWrite(env_, path, &file));
+
+  WritableFileOutputStream stream(file.get(), 4096);
+
+  void* buf;
+  int size;
+
+  // First call should yield the whole buffer.
+  ASSERT_TRUE(stream.Next(&buf, &size));
+  ASSERT_EQ(4096, size);
+  ASSERT_EQ(4096, stream.ByteCount());
+
+  // Backup 1000 and the next call should yield 1000
+  stream.BackUp(1000);
+  ASSERT_EQ(3096, stream.ByteCount());
+
+  ASSERT_TRUE(stream.Next(&buf, &size));
+  ASSERT_EQ(1000, size);
+
+  // Another call should flush and yield a new buffer of 4096
+  ASSERT_TRUE(stream.Next(&buf, &size));
+  ASSERT_EQ(4096, size);
+  ASSERT_EQ(8192, stream.ByteCount());
+
+  // Should be able to backup to 7192
+  stream.BackUp(1000);
+  ASSERT_EQ(7192, stream.ByteCount());
+
+  // Flushing shouldn't change written count.
+  ASSERT_TRUE(stream.Flush());
+  ASSERT_EQ(7192, stream.ByteCount());
+
+  // Since we just flushed, we should get another full buffer.
+  ASSERT_TRUE(stream.Next(&buf, &size));
+  ASSERT_EQ(4096, size);
+  ASSERT_EQ(7192 + 4096, stream.ByteCount());
+
+  ASSERT_TRUE(stream.Flush());
+
+  ASSERT_EQ(stream.ByteCount(), file->Size());
+}
+
+// Basic read/write test.
+TEST_F(TestPBUtil, TestPBContainerSimple) {
+  // Exercise both the SYNC and NO_SYNC codepaths, despite the fact that we
+  // aren't able to observe a difference in the test.
+  vector<SyncMode> modes = { SYNC, NO_SYNC };
+  for (SyncMode mode : modes) {
+
+    // Write the file.
+    ASSERT_OK(CreateKnownGoodContainerFile(NO_OVERWRITE, mode));
+
+    // Read it back, should validate and contain the expected values.
+    ProtoContainerTestPB test_pb;
+    ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb));
+    ASSERT_EQ(kTestKeyvalName, test_pb.name());
+    ASSERT_EQ(kTestKeyvalValue, test_pb.value());
+
+    // Delete the file.
+    ASSERT_OK(env_->DeleteFile(path_));
+  }
+}
+
+// Corruption / various failure mode test.
+TEST_P(TestPBContainerVersions, TestCorruption) {
+  // Test that we indicate when the file does not exist.
+  ProtoContainerTestPB test_pb;
+  Status s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  ASSERT_TRUE(s.IsNotFound()) << "Should not be found: " << path_ << ": " << s.ToString();
+
+  // Test that an empty file looks like corruption.
+  {
+    // Create the empty file.
+    unique_ptr<WritableFile> file;
+    ASSERT_OK(env_->NewWritableFile(path_, &file));
+    ASSERT_OK(file->Close());
+  }
+  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  ASSERT_TRUE(s.IsIncomplete()) << "Should be zero length: " << path_ << ": " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+
+  // Test truncated file.
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  uint64_t known_good_size = 0;
+  ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+  ASSERT_OK(TruncateFile(path_, known_good_size - 2));
+  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  if (version_ == 1) {
+    ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect size: " << path_ << ": " << s.ToString();
+  } else {
+    ASSERT_TRUE(s.IsIncomplete()) << "Should be incorrect size: " << path_ << ": " << s.ToString();
+  }
+  ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+
+  // Test corrupted magic.
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(BitFlipFileByteRange(path_, 0, 2));
+  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  ASSERT_TRUE(s.IsCorruption()) << "Should have invalid magic: " << path_ << ": " << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Invalid magic number");
+
+  // Test corrupted version.
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(BitFlipFileByteRange(path_, 8, 2));
+  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  ASSERT_TRUE(s.IsNotSupported()) << "Should have unsupported version number: " << path_ << ": "
+                                  << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), " Protobuf container has unsupported version");
+
+  // Test corrupted magic+version checksum (only exists in the V2+ format).
+  if (version_ >= 2) {
+    ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+    ASSERT_OK(BitFlipFileByteRange(path_, 12, 2));
+    s = ReadPBContainerFromPath(env_, path_, &test_pb);
+    ASSERT_TRUE(s.IsCorruption()) << "Should have corrupted file header checksum: " << path_ << ": "
+                                    << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "File header checksum does not match");
+  }
+
+  // Test record corruption below.
+  const int kFirstRecordOffset = (version_ == 1) ? 12 : 16;
+
+  // Test corrupted data length.
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset, 2));
+  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  if (version_ == 1) {
+    ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+  } else {
+    ASSERT_TRUE(s.IsCorruption()) << "Should be invalid data length checksum: "
+                                  << path_ << ": " << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+  }
+
+  // Test corrupted data (looks like bad checksum).
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset + 4, 2));
+  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ << ": "
+                                << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+
+  // Test corrupted checksum.
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(BitFlipFileByteRange(path_, known_good_size - 4, 2));
+  s = ReadPBContainerFromPath(env_, path_, &test_pb);
+  ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ << ": "
+                                << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+}
+
+// Test partial record at end of file.
+TEST_P(TestPBContainerVersions, TestPartialRecord) {
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  uint64_t known_good_size;
+  ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+  ASSERT_OK(TruncateFile(path_, known_good_size - 2));
+
+  unique_ptr<RandomAccessFile> file;
+  ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+  ReadablePBContainerFile pb_file(std::move(file));
+  ASSERT_OK(pb_file.Open());
+  ProtoContainerTestPB test_pb;
+  Status s = pb_file.ReadNextPB(&test_pb);
+  // Loop to verify that the same response is repeatably returned.
+  for (int i = 0; i < 2; i++) {
+    if (version_ == 1) {
+      ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+    } else {
+      ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+    }
+    ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+  }
+  ASSERT_OK(pb_file.Close());
+}
+
+// KUDU-2260: Test handling extra null bytes at the end of file. This can
+// occur, for example, on ext4 in default data=ordered mode when a write
+// increases the filesize but the system crashes before the actual data is
+// persisted.
+TEST_P(TestPBContainerVersions, TestExtraNullBytes) {
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  uint64_t known_good_size;
+  ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+  for (const auto extra_bytes : {1, 8, 128}) {
+    ASSERT_OK(TruncateFile(path_, known_good_size + extra_bytes));
+
+    unique_ptr<RandomAccessFile> file;
+    ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+    ReadablePBContainerFile pb_file(std::move(file));
+    ASSERT_OK(pb_file.Open());
+    ProtoContainerTestPB test_pb;
+    // Read the first good PB. Trouble starts at the second.
+    ASSERT_OK(pb_file.ReadNextPB(&test_pb));
+    Status s = pb_file.ReadNextPB(&test_pb);
+    // Loop to verify that the same response is repeatably returned.
+    for (int i = 0; i < 2; i++) {
+      ASSERT_TRUE(version_ == 1 ? s.IsCorruption() : s.IsIncomplete()) << s.ToString();
+      if (extra_bytes < 8) {
+        ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+      } else if (version_ == 1) {
+        ASSERT_STR_CONTAINS(s.ToString(), "Length and data checksum does not match");
+      } else {
+        ASSERT_STR_CONTAINS(s.ToString(), "rest of file is NULL bytes");
+      }
+    }
+    ASSERT_OK(pb_file.Close());
+  }
+}
+
+// Test that it is possible to append after a partial write if we truncate the
+// partial record. This is only fully supported in V2+.
+TEST_P(TestPBContainerVersions, TestAppendAfterPartialWrite) {
+  uint64_t known_good_size;
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+
+  unique_ptr<WritablePBContainerFile> writer;
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  ASSERT_OK(NewPBCWriter(version_, opts, &writer));
+  ASSERT_OK(writer->OpenExisting());
+
+  ASSERT_OK(TruncateFile(path_, known_good_size - 2));
+
+  unique_ptr<RandomAccessFile> file;
+  ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+  ReadablePBContainerFile reader(std::move(file));
+  ASSERT_OK(reader.Open());
+  ProtoContainerTestPB test_pb;
+  Status s = reader.ReadNextPB(&test_pb);
+  ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+  if (version_ == 1) {
+    ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+    return; // The rest of the test does not apply to version 1.
+  }
+  ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+
+  // Now truncate cleanly.
+  ASSERT_OK(TruncateFile(path_, reader.offset()));
+  s = reader.ReadNextPB(&test_pb);
+  ASSERT_TRUE(s.IsEndOfFile()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Reached end of file");
+
+  // Reopen the writer to allow appending more records.
+  // Append a record and read it back.
+  ASSERT_OK(NewPBCWriter(version_, opts, &writer));
+  ASSERT_OK(writer->OpenExisting());
+  test_pb.set_name("hello");
+  test_pb.set_value(1);
+  ASSERT_OK(writer->Append(test_pb));
+  test_pb.Clear();
+  ASSERT_OK(reader.ReadNextPB(&test_pb));
+  ASSERT_EQ("hello", test_pb.name());
+  ASSERT_EQ(1, test_pb.value());
+}
+
+// Simple test for all versions.
+TEST_P(TestPBContainerVersions, TestSingleMessage) {
+  ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+  ProtoContainerTestPB test_pb;
+  ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb));
+  ASSERT_EQ(kTestKeyvalName, test_pb.name());
+  ASSERT_EQ(kTestKeyvalValue, test_pb.value());
+}
+
+TEST_P(TestPBContainerVersions, TestMultipleMessages) {
+  ProtoContainerTestPB pb;
+  pb.set_name("foo");
+  pb.set_note("bar");
+
+  unique_ptr<WritablePBContainerFile> pb_writer;
+  ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
+  ASSERT_OK(pb_writer->CreateNew(pb));
+
+  for (int i = 0; i < 10; i++) {
+    pb.set_value(i);
+    ASSERT_OK(pb_writer->Append(pb));
+  }
+  ASSERT_OK(pb_writer->Close());
+
+  int pbs_read = 0;
+  unique_ptr<RandomAccessFile> reader;
+  ASSERT_OK(env_->NewRandomAccessFile(path_, &reader));
+  ReadablePBContainerFile pb_reader(std::move(reader));
+  ASSERT_OK(pb_reader.Open());
+  for (int i = 0;; i++) {
+    ProtoContainerTestPB read_pb;
+    Status s = pb_reader.ReadNextPB(&read_pb);
+    if (s.IsEndOfFile()) {
+      break;
+    }
+    ASSERT_OK(s);
+    ASSERT_EQ(pb.name(), read_pb.name());
+    ASSERT_EQ(read_pb.value(), i);
+    ASSERT_EQ(pb.note(), read_pb.note());
+    pbs_read++;
+  }
+  ASSERT_EQ(10, pbs_read);
+  ASSERT_OK(pb_reader.Close());
+}
+
+TEST_P(TestPBContainerVersions, TestInterleavedReadWrite) {
+  ProtoContainerTestPB pb;
+  pb.set_name("foo");
+  pb.set_note("bar");
+
+  // Open the file for writing and reading.
+  unique_ptr<WritablePBContainerFile> pb_writer;
+  ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
+  unique_ptr<RandomAccessFile> reader;
+  ASSERT_OK(env_->NewRandomAccessFile(path_, &reader));
+  ReadablePBContainerFile pb_reader(std::move(reader));
+
+  // Write the header (writer) and validate it (reader).
+  ASSERT_OK(pb_writer->CreateNew(pb));
+  ASSERT_OK(pb_reader.Open());
+
+  for (int i = 0; i < 10; i++) {
+    SCOPED_TRACE(i);
+    // Write a message and read it back.
+    pb.set_value(i);
+    ASSERT_OK(pb_writer->Append(pb));
+    ProtoContainerTestPB read_pb;
+    ASSERT_OK(pb_reader.ReadNextPB(&read_pb));
+    ASSERT_EQ(pb.name(), read_pb.name());
+    ASSERT_EQ(read_pb.value(), i);
+    ASSERT_EQ(pb.note(), read_pb.note());
+  }
+
+  // After closing the writer, the reader should be out of data.
+  ASSERT_OK(pb_writer->Close());
+  ASSERT_TRUE(pb_reader.ReadNextPB(nullptr).IsEndOfFile());
+  ASSERT_OK(pb_reader.Close());
+}
+
+TEST_F(TestPBUtil, TestPopulateDescriptorSet) {
+  {
+    // No dependencies --> just one proto.
+    ProtoContainerTestPB pb;
+    FileDescriptorSet protos;
+    WritablePBContainerFile::PopulateDescriptorSet(
+        pb.GetDescriptor()->file(), &protos);
+    ASSERT_EQ(1, protos.file_size());
+  }
+  {
+    // One direct dependency --> two protos.
+    ProtoContainerTest2PB pb;
+    FileDescriptorSet protos;
+    WritablePBContainerFile::PopulateDescriptorSet(
+        pb.GetDescriptor()->file(), &protos);
+    ASSERT_EQ(2, protos.file_size());
+  }
+  {
+    // One direct and one indirect dependency --> three protos.
+    ProtoContainerTest3PB pb;
+    FileDescriptorSet protos;
+    WritablePBContainerFile::PopulateDescriptorSet(
+        pb.GetDescriptor()->file(), &protos);
+    ASSERT_EQ(3, protos.file_size());
+  }
+}
+
+void TestPBUtil::DumpPBCToString(const string& path,
+                                 ReadablePBContainerFile::Format format,
+                                 string* ret) {
+  unique_ptr<RandomAccessFile> reader;
+  ASSERT_OK(env_->NewRandomAccessFile(path, &reader));
+  ReadablePBContainerFile pb_reader(std::move(reader));
+  ASSERT_OK(pb_reader.Open());
+  ostringstream oss;
+  ASSERT_OK(pb_reader.Dump(&oss, format));
+  ASSERT_OK(pb_reader.Close());
+  *ret = oss.str();
+}
+
+TEST_P(TestPBContainerVersions, TestDumpPBContainer) {
+  const char* kExpectedOutput =
+      "Message 0\n"
+      "-------\n"
+      "record_one {\n"
+      "  name: \"foo\"\n"
+      "  value: 0\n"
+      "}\n"
+      "record_two {\n"
+      "  record {\n"
+      "    name: \"foo\"\n"
+      "    value: 0\n"
+      "  }\n"
+      "}\n"
+      "\n"
+      "Message 1\n"
+      "-------\n"
+      "record_one {\n"
+      "  name: \"foo\"\n"
+      "  value: 1\n"
+      "}\n"
+      "record_two {\n"
+      "  record {\n"
+      "    name: \"foo\"\n"
+      "    value: 2\n"
+      "  }\n"
+      "}\n\n";
+
+  const char* kExpectedOutputShort =
+    "0\trecord_one { name: \"foo\" value: 0 } record_two { record { name: \"foo\" value: 0 } }\n"
+    "1\trecord_one { name: \"foo\" value: 1 } record_two { record { name: \"foo\" value: 2 } }\n";
+
+  const char* kExpectedOutputJson =
+      "{\"recordOne\":{\"name\":\"foo\",\"value\":0},\"recordTwo\":{\"record\":{\"name\":\"foo\",\"value\":0}}}\n" // NOLINT
+      "{\"recordOne\":{\"name\":\"foo\",\"value\":1},\"recordTwo\":{\"record\":{\"name\":\"foo\",\"value\":2}}}\n"; // NOLINT
+
+  ProtoContainerTest3PB pb;
+  pb.mutable_record_one()->set_name("foo");
+  pb.mutable_record_two()->mutable_record()->set_name("foo");
+
+  unique_ptr<WritablePBContainerFile> pb_writer;
+  ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
+  ASSERT_OK(pb_writer->CreateNew(pb));
+
+  for (int i = 0; i < 2; i++) {
+    pb.mutable_record_one()->set_value(i);
+    pb.mutable_record_two()->mutable_record()->set_value(i*2);
+    ASSERT_OK(pb_writer->Append(pb));
+  }
+  ASSERT_OK(pb_writer->Close());
+
+  string output;
+  NO_FATALS(DumpPBCToString(path_, ReadablePBContainerFile::Format::DEFAULT, &output));
+  ASSERT_STREQ(kExpectedOutput, output.c_str());
+
+  NO_FATALS(DumpPBCToString(path_, ReadablePBContainerFile::Format::ONELINE, &output));
+  ASSERT_STREQ(kExpectedOutputShort, output.c_str());
+
+  NO_FATALS(DumpPBCToString(path_, ReadablePBContainerFile::Format::JSON, &output));
+  ASSERT_STREQ(kExpectedOutputJson, output.c_str());
+}
+
+TEST_F(TestPBUtil, TestOverwriteExistingPB) {
+  ASSERT_OK(CreateKnownGoodContainerFile(NO_OVERWRITE));
+  ASSERT_TRUE(CreateKnownGoodContainerFile(NO_OVERWRITE).IsAlreadyPresent());
+  ASSERT_OK(CreateKnownGoodContainerFile(OVERWRITE));
+  ASSERT_OK(CreateKnownGoodContainerFile(OVERWRITE));
+}
+
+TEST_F(TestPBUtil, TestRedaction) {
+  ASSERT_NE("", gflags::SetCommandLineOption("redact", "log"));
+  TestSecurePrintingPB pb;
+
+  pb.set_insecure1("public 1");
+  pb.set_insecure2("public 2");
+  pb.set_secure1("private 1");
+  pb.set_secure2("private 2");
+  pb.add_repeated_secure("private 3");
+  pb.add_repeated_secure("private 4");
+  pb.set_insecure3("public 3");
+
+  for (auto s : {SecureDebugString(pb), SecureShortDebugString(pb)}) {
+    ASSERT_EQ(string::npos, s.find("private"));
+    ASSERT_STR_CONTAINS(s, "<redacted>");
+    ASSERT_STR_CONTAINS(s, "public 1");
+    ASSERT_STR_CONTAINS(s, "public 2");
+    ASSERT_STR_CONTAINS(s, "public 3");
+  }
+
+  // If we disable redaction, we should see the private fields.
+  ASSERT_NE("", gflags::SetCommandLineOption("redact", ""));
+  ASSERT_STR_CONTAINS(SecureDebugString(pb), "private");
+}
+
+} // namespace pb_util
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.cc b/be/src/kudu/util/pb_util.cc
new file mode 100644
index 0000000..edf1222
--- /dev/null
+++ b/be/src/kudu/util/pb_util.cc
@@ -0,0 +1,1088 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Some portions copyright (C) 2008, Google, inc.
+//
+// Utilities for working with protobufs.
+// Some of this code is cribbed from the protobuf source,
+// but modified to work with kudu's 'faststring' instead of STL strings.
+
+#include "kudu/util/pb_util.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <deque>
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/descriptor_database.h>
+#include <google/protobuf/dynamic_message.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/message.h>
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/stubs/status.h>
+#include <google/protobuf/text_format.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/coding.h"
+#include "kudu/util/coding-inl.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/debug/sanitizer_scopes.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util-internal.h"
+#include "kudu/util/pb_util.pb.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+using google::protobuf::Descriptor;
+using google::protobuf::DescriptorPool;
+using google::protobuf::DynamicMessageFactory;
+using google::protobuf::FieldDescriptor;
+using google::protobuf::FileDescriptor;
+using google::protobuf::FileDescriptorProto;
+using google::protobuf::FileDescriptorSet;
+using google::protobuf::io::ArrayInputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::Message;
+using google::protobuf::MessageLite;
+using google::protobuf::Reflection;
+using google::protobuf::SimpleDescriptorDatabase;
+using google::protobuf::TextFormat;
+using kudu::crc::Crc;
+using kudu::pb_util::internal::SequentialFileFileInputStream;
+using kudu::pb_util::internal::WritableFileOutputStream;
+using std::deque;
+using std::endl;
+using std::initializer_list;
+using std::ostream;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+using strings::Utf8SafeCEscape;
+
+namespace std {
+
+// Allow the use of FileState with DCHECK_EQ.
+std::ostream& operator<< (std::ostream& os, const kudu::pb_util::FileState& state) {
+  os << static_cast<int>(state);
+  return os;
+}
+
+} // namespace std
+
+namespace kudu {
+namespace pb_util {
+
+static const char* const kTmpTemplateSuffix = ".XXXXXX";
+
+// Protobuf container constants.
+static const uint32_t kPBContainerInvalidVersion = 0;
+static const uint32_t kPBContainerDefaultVersion = 2;
+static const int kPBContainerChecksumLen = sizeof(uint32_t);
+static const char kPBContainerMagic[] = "kuducntr";
+static const int kPBContainerMagicLen = 8;
+static const int kPBContainerV1HeaderLen =
+    kPBContainerMagicLen + sizeof(uint32_t); // Magic number + version.
+static const int kPBContainerV2HeaderLen =
+    kPBContainerV1HeaderLen + kPBContainerChecksumLen; // Same as V1 plus a checksum.
+
+const int kPBContainerMinimumValidLength = kPBContainerV1HeaderLen;
+
+static_assert(arraysize(kPBContainerMagic) - 1 == kPBContainerMagicLen,
+              "kPBContainerMagic does not match expected length");
+
+namespace {
+
+// When serializing, we first compute the byte size, then serialize the message.
+// If serialization produces a different number of bytes than expected, we
+// call this function, which crashes.  The problem could be due to a bug in the
+// protobuf implementation but is more likely caused by concurrent modification
+// of the message.  This function attempts to distinguish between the two and
+// provide a useful error message.
+void ByteSizeConsistencyError(int byte_size_before_serialization,
+                              int byte_size_after_serialization,
+                              int bytes_produced_by_serialization) {
+  CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization)
+      << "Protocol message was modified concurrently during serialization.";
+  CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization)
+      << "Byte size calculation and serialization were inconsistent.  This "
+         "may indicate a bug in protocol buffers or it may be caused by "
+         "concurrent modification of the message.";
+  LOG(FATAL) << "This shouldn't be called if all the sizes are equal.";
+}
+
+string InitializationErrorMessage(const char* action,
+                                  const MessageLite& message) {
+  // Note:  We want to avoid depending on strutil in the lite library, otherwise
+  //   we'd use:
+  //
+  // return strings::Substitute(
+  //   "Can't $0 message of type \"$1\" because it is missing required "
+  //   "fields: $2",
+  //   action, message.GetTypeName(),
+  //   message.InitializationErrorString());
+
+  string result;
+  result += "Can't ";
+  result += action;
+  result += " message of type \"";
+  result += message.GetTypeName();
+  result += "\" because it is missing required fields: ";
+  result += message.InitializationErrorString();
+  return result;
+}
+
+// Returns true iff the specified protobuf container file version is supported
+// by this implementation.
+bool IsSupportedContainerVersion(uint32_t version) {
+  if (version == 1 || version == 2) {
+    return true;
+  }
+  return false;
+}
+
+// Reads exactly 'length' bytes from the container file into 'result',
+// validating that there is sufficient data in the file to read this length
+// before attempting to do so, and validating that it has read that length
+// after performing the read.
+//
+// If the file size is less than the requested size of the read, returns
+// Status::Incomplete.
+// If there is an unexpected short read, returns Status::Corruption.
+//
+// NOTE: the data in 'result' may be modified even in the case of a failed read.
+template<typename ReadableFileType>
+Status ValidateAndReadData(ReadableFileType* reader, uint64_t file_size,
+                           uint64_t* offset, uint64_t length,
+                           faststring* result) {
+  // Validate the read length using the file size.
+  if (*offset + length > file_size) {
+    return Status::Incomplete("File size not large enough to be valid",
+                              Substitute("Proto container file $0: "
+                                  "Tried to read $1 bytes at offset "
+                                  "$2 but file size is only $3 bytes",
+                                  reader->filename(), length,
+                                  *offset, file_size));
+  }
+
+  // Perform the read.
+  result->resize(length);
+  RETURN_NOT_OK(reader->Read(*offset, Slice(*result)));
+  *offset += length;
+  return Status::OK();
+}
+
+// Helper macro for use with ParseAndCompareChecksum(). Example usage:
+// RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { data }),
+//    CHECKSUM_ERR_MSG("Data checksum does not match", filename, offset));
+#define CHECKSUM_ERR_MSG(prefix, filename, cksum_offset) \
+  Substitute("$0: Incorrect checksum in file $1 at offset $2", prefix, filename, cksum_offset)
+
+// Parses a checksum from the specified buffer and compares it to the bytes
+// given in 'slices' by calculating a rolling CRC32 checksum of the bytes in
+// the 'slices'.
+// If they match, returns OK. Otherwise, returns Status::Corruption.
+Status ParseAndCompareChecksum(const uint8_t* checksum_buf,
+                               const initializer_list<Slice>& slices) {
+  uint32_t written_checksum = DecodeFixed32(checksum_buf);
+  uint64_t actual_checksum = 0;
+  Crc* crc32c = crc::GetCrc32cInstance();
+  for (Slice s : slices) {
+    crc32c->Compute(s.data(), s.size(), &actual_checksum);
+  }
+  if (PREDICT_FALSE(actual_checksum != written_checksum)) {
+    return Status::Corruption(Substitute("Checksum does not match. Expected: $0. Actual: $1",
+                                         written_checksum, actual_checksum));
+  }
+  return Status::OK();
+}
+
+// If necessary, get the size of the file opened by 'reader' in 'cached_file_size'.
+// If 'cached_file_size' already has a value, this is a no-op.
+template<typename ReadableFileType>
+Status CacheFileSize(ReadableFileType* reader,
+                     boost::optional<uint64_t>* cached_file_size) {
+  if (*cached_file_size) {
+    return Status::OK();
+  }
+
+  uint64_t file_size;
+  RETURN_NOT_OK(reader->Size(&file_size));
+  *cached_file_size = file_size;
+  return Status::OK();
+}
+
+template<typename ReadableFileType>
+Status RestOfFileIsAllZeros(ReadableFileType* reader,
+                            uint64_t filesize,
+                            uint64_t offset,
+                            bool* all_zeros) {
+  DCHECK(reader);
+  DCHECK_GE(filesize, offset);
+  DCHECK(all_zeros);
+  constexpr uint64_t max_to_read = 4 * 1024 * 1024; // 4 MiB.
+  faststring buf;
+  while (true) {
+    uint64_t to_read = std::min(max_to_read, filesize - offset);
+    if (to_read == 0) {
+      break;
+    }
+    buf.resize(to_read);
+    RETURN_NOT_OK(reader->Read(offset, Slice(buf)));
+    offset += to_read;
+    if (!IsAllZeros(buf)) {
+      *all_zeros = false;
+      return Status::OK();
+    }
+  }
+  *all_zeros = true;
+  return Status::OK();
+}
+
+// Read and parse a message of the specified format at the given offset in the
+// format documented in pb_util.h. 'offset' is an in-out parameter and will be
+// updated with the new offset on success. On failure, 'offset' is not modified.
+template<typename ReadableFileType>
+Status ReadPBStartingAt(ReadableFileType* reader, int version,
+                        boost::optional<uint64_t>* cached_file_size,
+                        uint64_t* offset, Message* msg) {
+  uint64_t tmp_offset = *offset;
+  VLOG(1) << "Reading PB with version " << version << " starting at offset " << *offset;
+
+  RETURN_NOT_OK(CacheFileSize(reader, cached_file_size));
+  uint64_t file_size = cached_file_size->get();
+
+  if (tmp_offset == *cached_file_size) {
+    return Status::EndOfFile("Reached end of file");
+  }
+
+  // Read the data length from the file.
+  // Version 2+ includes a checksum for the length field.
+  uint64_t length_buflen = (version == 1) ? sizeof(uint32_t)
+                                          : sizeof(uint32_t) + kPBContainerChecksumLen;
+  faststring length_and_cksum_buf;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, length_buflen,
+                                            &length_and_cksum_buf),
+                        Substitute("Could not read data length from proto container file $0 "
+                                   "at offset $1", reader->filename(), *offset));
+  Slice length(length_and_cksum_buf.data(), sizeof(uint32_t));
+
+  // Versions >= 2 have an individual checksum for the data length.
+  if (version >= 2) {
+    // KUDU-2260: If the length and checksum data are all 0's, and the rest of
+    // the file is all 0's, then it's an incomplete record, not corruption.
+    // This can happen e.g. on ext4 in the default data=ordered mode, when the
+    // filesize metadata is updated but the new data is not persisted.
+    // See https://plus.google.com/+KentonVarda/posts/JDwHfAiLGNQ.
+    if (IsAllZeros(length_and_cksum_buf)) {
+      bool all_zeros;
+      RETURN_NOT_OK(RestOfFileIsAllZeros(reader, file_size, tmp_offset, &all_zeros));
+      if (all_zeros) {
+        return Status::Incomplete("incomplete write of PB: rest of file is NULL bytes");
+      }
+    }
+    Slice length_checksum(length_and_cksum_buf.data() + sizeof(uint32_t), kPBContainerChecksumLen);
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(length_checksum.data(), { length }),
+        CHECKSUM_ERR_MSG("Data length checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  }
+  uint32_t data_length = DecodeFixed32(length.data());
+
+  // Read body and checksum into buffer for checksum & parsing.
+  uint64_t data_and_cksum_buflen = data_length + kPBContainerChecksumLen;
+  faststring body_and_cksum_buf;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, data_and_cksum_buflen,
+                                            &body_and_cksum_buf),
+                        Substitute("Could not read PB message data from proto container file $0 "
+                                   "at offset $1",
+                                   reader->filename(), tmp_offset));
+  Slice body(body_and_cksum_buf.data(), data_length);
+  Slice record_checksum(body_and_cksum_buf.data() + data_length, kPBContainerChecksumLen);
+
+  // Version 1 has a single checksum for length, body.
+  // Version 2+ has individual checksums for length and body, respectively.
+  if (version == 1) {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { length, body }),
+        CHECKSUM_ERR_MSG("Length and data checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  } else {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { body }),
+        CHECKSUM_ERR_MSG("Data checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  }
+
+  // The checksum is correct. Time to decode the body.
+  //
+  // We could compare pb_type_ against msg.GetTypeName(), but:
+  // 1. pb_type_ is not available when reading the supplemental header,
+  // 2. ParseFromArray() should fail if the data cannot be parsed into the
+  //    provided message type.
+
+  // To permit parsing of very large PB messages, we must use parse through a
+  // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs
+  // say that 512MB is the shortest theoretical message length that may produce
+  // integer overflow warnings, so that's what we'll use.
+  ArrayInputStream ais(body.data(), body.size());
+  CodedInputStream cis(&ais);
+  cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+  if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
+    return Status::IOError("Unable to parse PB from path", reader->filename());
+  }
+
+  *offset = tmp_offset;
+  return Status::OK();
+}
+
+// Wrapper around ReadPBStartingAt() to enforce that we don't return
+// Status::Incomplete() for V1 format files.
+template<typename ReadableFileType>
+Status ReadFullPB(ReadableFileType* reader, int version,
+                  boost::optional<uint64_t>* cached_file_size,
+                  uint64_t* offset, Message* msg) {
+  bool had_cached_size = *cached_file_size != boost::none;
+  Status s = ReadPBStartingAt(reader, version, cached_file_size, offset, msg);
+  if (PREDICT_FALSE(s.IsIncomplete() && version == 1)) {
+    return Status::Corruption("Unrecoverable incomplete record", s.ToString());
+  }
+  // If we hit EOF, but we were using a cached view of the file size, then it might be
+  // that the file has been extended. Invalidate the cache and try again.
+  if (had_cached_size && (s.IsIncomplete() || s.IsEndOfFile())) {
+    *cached_file_size = boost::none;
+    return ReadFullPB(reader, version, cached_file_size, offset, msg);
+  }
+  return s;
+}
+
+// Read and parse the protobuf container file-level header documented in pb_util.h.
+template<typename ReadableFileType>
+Status ParsePBFileHeader(ReadableFileType* reader, boost::optional<uint64_t>* cached_file_size,
+                         uint64_t* offset, int* version) {
+  RETURN_NOT_OK(CacheFileSize(reader, cached_file_size));
+  uint64_t file_size = cached_file_size->get();
+
+  // We initially read enough data for a V2+ file header. This optimizes for
+  // V2+ and is valid on a V1 file because we don't consider these files valid
+  // unless they contain a record in addition to the file header. The
+  // additional 4 bytes required by a V2+ header (vs V1) is still less than the
+  // minimum number of bytes required for a V1 format data record.
+  uint64_t tmp_offset = *offset;
+  faststring header;
+  RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, kPBContainerV2HeaderLen,
+                                            &header),
+                        Substitute("Could not read header for proto container file $0",
+                                   reader->filename()));
+  Slice magic_and_version(header.data(), kPBContainerMagicLen + sizeof(uint32_t));
+  Slice checksum(header.data() + kPBContainerMagicLen + sizeof(uint32_t), kPBContainerChecksumLen);
+
+  // Validate magic number.
+  if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) {
+    string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen);
+    return Status::Corruption("Invalid magic number",
+                              Substitute("Expected: $0, found: $1",
+                                         Utf8SafeCEscape(kPBContainerMagic),
+                                         Utf8SafeCEscape(file_magic)));
+  }
+
+  // Validate container file version.
+  uint32_t tmp_version = DecodeFixed32(header.data() + kPBContainerMagicLen);
+  if (PREDICT_FALSE(!IsSupportedContainerVersion(tmp_version))) {
+    return Status::NotSupported(
+        Substitute("Protobuf container has unsupported version: $0. Default version: $1",
+                   tmp_version, kPBContainerDefaultVersion));
+  }
+
+  // Versions >= 2 have a checksum after the magic number and encoded version
+  // to ensure the integrity of these fields.
+  if (tmp_version >= 2) {
+    RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { magic_and_version }),
+        CHECKSUM_ERR_MSG("File header checksum does not match",
+                         reader->filename(), tmp_offset - kPBContainerChecksumLen));
+  } else {
+    // Version 1 doesn't have a header checksum. Rewind our read offset so this
+    // data will be read again when we next attempt to read a data record.
+    tmp_offset -= kPBContainerChecksumLen;
+  }
+
+  *offset = tmp_offset;
+  *version = tmp_version;
+  return Status::OK();
+}
+
+// Read and parse the supplemental header from the container file.
+template<typename ReadableFileType>
+Status ReadSupplementalHeader(ReadableFileType* reader, int version,
+                              boost::optional<uint64_t>* cached_file_size,
+                              uint64_t* offset,
+                              ContainerSupHeaderPB* sup_header) {
+  RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, cached_file_size, offset, sup_header),
+      Substitute("Could not read supplemental header from proto container file $0 "
+                 "with version $1 at offset $2",
+                 reader->filename(), version, *offset));
+  return Status::OK();
+}
+
+} // anonymous namespace
+
+void AppendToString(const MessageLite &msg, faststring *output) {
+  DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
+  AppendPartialToString(msg, output);
+}
+
+void AppendPartialToString(const MessageLite &msg, faststring* output) {
+  size_t old_size = output->size();
+  int byte_size = msg.ByteSize();
+  // Messages >2G cannot be serialized due to overflow computing ByteSize.
+  DCHECK_GE(byte_size, 0) << "Error computing ByteSize";
+
+  output->resize(old_size + static_cast<size_t>(byte_size));
+
+  uint8* start = &((*output)[old_size]);
+  uint8* end = msg.SerializeWithCachedSizesToArray(start);
+  if (end - start != byte_size) {
+    ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start);
+  }
+}
+
+void SerializeToString(const MessageLite &msg, faststring *output) {
+  output->clear();
+  AppendToString(msg, output);
+}
+
+Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) {
+  SequentialFileFileInputStream input(rfile);
+  if (!msg->ParseFromZeroCopyStream(&input)) {
+    RETURN_NOT_OK(input.status());
+
+    // If it's not a file IO error then it's a parsing error.
+    // Probably, we read wrong or damaged data here.
+    return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
+  }
+  return Status::OK();
+}
+
+Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length) {
+  if (!msg->ParseFromArray(data, length)) {
+    return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
+  }
+  return Status::OK();
+}
+
+Status WritePBToPath(Env* env, const std::string& path,
+                     const MessageLite& msg,
+                     SyncMode sync) {
+  const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix;
+  string tmp_path;
+
+  unique_ptr<WritableFile> file;
+  RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
+  auto tmp_deleter = MakeScopedCleanup([&]() {
+    WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + tmp_path);
+  });
+
+  WritableFileOutputStream output(file.get());
+  bool res = msg.SerializeToZeroCopyStream(&output);
+  if (!res || !output.Flush()) {
+    return Status::IOError("Unable to serialize PB to file");
+  }
+
+  if (sync == pb_util::SYNC) {
+    RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path);
+  }
+  RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path);
+  RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path);
+  tmp_deleter.cancel();
+  if (sync == pb_util::SYNC) {
+    RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path);
+  }
+  return Status::OK();
+}
+
+Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) {
+  shared_ptr<SequentialFile> rfile;
+  RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile));
+  RETURN_NOT_OK(ParseFromSequentialFile(msg, rfile.get()));
+  return Status::OK();
+}
+
+static void TruncateString(string* s, int max_len) {
+  if (s->size() > max_len) {
+    s->resize(max_len);
+    s->append("<truncated>");
+  }
+}
+
+void TruncateFields(Message* message, int max_len) {
+  const Reflection* reflection = message->GetReflection();
+  vector<const FieldDescriptor*> fields;
+  reflection->ListFields(*message, &fields);
+  for (const FieldDescriptor* field : fields) {
+    if (field->is_repeated()) {
+      for (int i = 0; i < reflection->FieldSize(*message, field); i++) {
+        switch (field->cpp_type()) {
+          case FieldDescriptor::CPPTYPE_STRING: {
+            const string& s_const = reflection->GetRepeatedStringReference(*message, field, i,
+                                                                           nullptr);
+            TruncateString(const_cast<string*>(&s_const), max_len);
+            break;
+          }
+          case FieldDescriptor::CPPTYPE_MESSAGE: {
+            TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len);
+            break;
+          }
+          default:
+            break;
+        }
+      }
+    } else {
+      switch (field->cpp_type()) {
+        case FieldDescriptor::CPPTYPE_STRING: {
+          const string& s_const = reflection->GetStringReference(*message, field, nullptr);
+          TruncateString(const_cast<string*>(&s_const), max_len);
+          break;
+        }
+        case FieldDescriptor::CPPTYPE_MESSAGE: {
+          TruncateFields(reflection->MutableMessage(message, field), max_len);
+          break;
+        }
+        default:
+          break;
+      }
+    }
+  }
+}
+
+namespace {
+class SecureFieldPrinter : public TextFormat::FieldValuePrinter {
+ public:
+  using super = TextFormat::FieldValuePrinter;
+
+  string PrintFieldName(const Message& message,
+                        const Reflection* reflection,
+                        const FieldDescriptor* field) const override {
+    hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING &&
+        field->options().GetExtension(REDACT);
+    return super::PrintFieldName(message, reflection, field);
+  }
+
+  string PrintString(const string& val) const override {
+    if (hide_next_string_) {
+      hide_next_string_ = false;
+      return KUDU_REDACT(super::PrintString(val));
+    }
+    return super::PrintString(val);
+  }
+  string PrintBytes(const string& val) const override {
+    if (hide_next_string_) {
+      hide_next_string_ = false;
+      return KUDU_REDACT(super::PrintBytes(val));
+    }
+    return super::PrintBytes(val);
+  }
+
+  mutable bool hide_next_string_ = false;
+};
+} // anonymous namespace
+
+string SecureDebugString(const Message& msg) {
+  string debug_string;
+  TextFormat::Printer printer;
+  printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter());
+  printer.PrintToString(msg, &debug_string);
+  return debug_string;
+}
+
+string SecureShortDebugString(const Message& msg) {
+  string debug_string;
+
+  TextFormat::Printer printer;
+  printer.SetSingleLineMode(true);
+  printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter());
+
+  printer.PrintToString(msg, &debug_string);
+  // Single line mode currently might have an extra space at the end.
+  if (!debug_string.empty() &&
+      debug_string[debug_string.size() - 1] == ' ') {
+    debug_string.resize(debug_string.size() - 1);
+  }
+
+  return debug_string;
+}
+
+
+WritablePBContainerFile::WritablePBContainerFile(shared_ptr<RWFile> writer)
+  : state_(FileState::NOT_INITIALIZED),
+    offset_(0),
+    version_(kPBContainerDefaultVersion),
+    writer_(std::move(writer)) {
+}
+
+WritablePBContainerFile::~WritablePBContainerFile() {
+  WARN_NOT_OK(Close(), "Could not Close() when destroying file");
+}
+
+Status WritablePBContainerFile::SetVersionForTests(int version) {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  if (!IsSupportedContainerVersion(version)) {
+    return Status::NotSupported(Substitute("Version $0 is not supported", version));
+  }
+  version_ = version;
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::CreateNew(const Message& msg) {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+
+  const uint64_t kHeaderLen = (version_ == 1) ? kPBContainerV1HeaderLen
+                                              : kPBContainerV1HeaderLen + kPBContainerChecksumLen;
+
+  faststring buf;
+  buf.resize(kHeaderLen);
+
+  // Serialize the magic.
+  strings::memcpy_inlined(buf.data(), kPBContainerMagic, kPBContainerMagicLen);
+  uint64_t offset = kPBContainerMagicLen;
+
+  // Serialize the version.
+  InlineEncodeFixed32(buf.data() + offset, version_);
+  offset += sizeof(uint32_t);
+  DCHECK_EQ(kPBContainerV1HeaderLen, offset)
+    << "Serialized unexpected number of total bytes";
+
+  // Versions >= 2: Checksum the magic and version.
+  if (version_ >= 2) {
+    uint32_t header_checksum = crc::Crc32c(buf.data(), offset);
+    InlineEncodeFixed32(buf.data() + offset, header_checksum);
+    offset += sizeof(uint32_t);
+  }
+  DCHECK_EQ(offset, kHeaderLen);
+
+  // Serialize the supplemental header.
+  ContainerSupHeaderPB sup_header;
+  PopulateDescriptorSet(msg.GetDescriptor()->file(),
+                        sup_header.mutable_protos());
+  sup_header.set_pb_type(msg.GetTypeName());
+  RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf),
+                        "Failed to prepare supplemental header for writing");
+
+  // Write the serialized buffer to the file.
+  RETURN_NOT_OK_PREPEND(AppendBytes(buf),
+                        "Failed to append header to file");
+  state_ = FileState::OPEN;
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::OpenExisting() {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  boost::optional<uint64_t> size;
+  RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &size, &offset_, &version_));
+  ContainerSupHeaderPB sup_header;
+  RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &size,
+                                       &offset_, &sup_header));
+  offset_ = size.get(); // Reset the write offset to the end of the file.
+  state_ = FileState::OPEN;
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::AppendBytes(const Slice& data) {
+  std::lock_guard<Mutex> l(offset_lock_);
+  RETURN_NOT_OK(writer_->Write(offset_, data));
+  offset_ += data.size();
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Append(const Message& msg) {
+  DCHECK_EQ(FileState::OPEN, state_);
+
+  faststring buf;
+  RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf),
+                        "Failed to prepare buffer for writing");
+  RETURN_NOT_OK_PREPEND(AppendBytes(buf), "Failed to append data to file");
+
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Flush() {
+  DCHECK_EQ(FileState::OPEN, state_);
+
+  // TODO: Flush just the dirty bytes.
+  RETURN_NOT_OK_PREPEND(writer_->Flush(RWFile::FLUSH_ASYNC, 0, 0), "Failed to Flush() file");
+
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Sync() {
+  DCHECK_EQ(FileState::OPEN, state_);
+
+  RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file");
+
+  return Status::OK();
+}
+
+Status WritablePBContainerFile::Close() {
+  if (state_ != FileState::CLOSED) {
+    state_ = FileState::CLOSED;
+    Status s = writer_->Close();
+    writer_.reset();
+    RETURN_NOT_OK_PREPEND(s, "Failed to Close() file");
+  }
+  return Status::OK();
+}
+
+const string& WritablePBContainerFile::filename() const {
+  return writer_->filename();
+}
+
+Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) {
+  DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
+  int data_len = msg.ByteSize();
+  // Messages >2G cannot be serialized due to overflow computing ByteSize.
+  DCHECK_GE(data_len, 0) << "Error computing ByteSize";
+  uint64_t record_buflen =  sizeof(uint32_t) + data_len + sizeof(uint32_t);
+  if (version_ >= 2) {
+    record_buflen += sizeof(uint32_t); // Additional checksum just for the length.
+  }
+
+  // Grow the buffer to hold the new data.
+  uint64_t record_offset = buf->size();
+  buf->resize(record_offset + record_buflen);
+  uint8_t* dst = buf->data() + record_offset;
+
+  // Serialize the data length.
+  size_t cur_offset = 0;
+  InlineEncodeFixed32(dst + cur_offset, static_cast<uint32_t>(data_len));
+  cur_offset += sizeof(uint32_t);
+
+  // For version >= 2: Serialize the checksum of the data length.
+  if (version_ >= 2) {
+    uint32_t length_checksum = crc::Crc32c(&data_len, sizeof(data_len));
+    InlineEncodeFixed32(dst + cur_offset, length_checksum);
+    cur_offset += sizeof(uint32_t);
+  }
+
+  // Serialize the data.
+  uint64_t data_offset = cur_offset;
+  if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + cur_offset))) {
+    return Status::IOError("Failed to serialize PB to array");
+  }
+  cur_offset += data_len;
+
+  // Calculate and serialize the data checksum.
+  // For version 1, this is the checksum of the len + data.
+  // For version >= 2, this is only the checksum of the data.
+  uint32_t data_checksum;
+  if (version_ == 1) {
+    data_checksum = crc::Crc32c(dst, cur_offset);
+  } else {
+    data_checksum = crc::Crc32c(dst + data_offset, data_len);
+  }
+  InlineEncodeFixed32(dst + cur_offset, data_checksum);
+  cur_offset += sizeof(uint32_t);
+
+  DCHECK_EQ(record_buflen, cur_offset) << "Serialized unexpected number of total bytes";
+  return Status::OK();
+}
+
+void WritablePBContainerFile::PopulateDescriptorSet(
+    const FileDescriptor* desc, FileDescriptorSet* output) {
+  // Because we don't compile protobuf with TSAN enabled, copying the
+  // static PB descriptors in this function ends up triggering a lot of
+  // race reports. We suppress the reports, but TSAN still has to walk
+  // the stack, etc, and this function becomes very slow. So, we ignore
+  // TSAN here.
+  debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+
+  FileDescriptorSet all_descs;
+
+  // Tracks all schemas that have been added to 'unemitted' at one point
+  // or another. Is a superset of 'unemitted' and only ever grows.
+  unordered_set<const FileDescriptor*> processed;
+
+  // Tracks all remaining unemitted schemas.
+  deque<const FileDescriptor*> unemitted;
+
+  InsertOrDie(&processed, desc);
+  unemitted.push_front(desc);
+  while (!unemitted.empty()) {
+    const FileDescriptor* proto = unemitted.front();
+
+    // The current schema is emitted iff we've processed (i.e. emitted) all
+    // of its dependencies.
+    bool emit = true;
+    for (int i = 0; i < proto->dependency_count(); i++) {
+      const FileDescriptor* dep = proto->dependency(i);
+      if (InsertIfNotPresent(&processed, dep)) {
+        unemitted.push_front(dep);
+        emit = false;
+      }
+    }
+    if (emit) {
+      unemitted.pop_front();
+      proto->CopyTo(all_descs.mutable_file()->Add());
+    }
+  }
+  all_descs.Swap(output);
+}
+
+ReadablePBContainerFile::ReadablePBContainerFile(shared_ptr<RandomAccessFile> reader)
+  : state_(FileState::NOT_INITIALIZED),
+    version_(kPBContainerInvalidVersion),
+    offset_(0),
+    reader_(std::move(reader)) {
+}
+
+ReadablePBContainerFile::~ReadablePBContainerFile() {
+  Close();
+}
+
+Status ReadablePBContainerFile::Open() {
+  DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+  RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &cached_file_size_, &offset_, &version_));
+  ContainerSupHeaderPB sup_header;
+  RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &cached_file_size_,
+                                       &offset_, &sup_header));
+  protos_.reset(sup_header.release_protos());
+  pb_type_ = sup_header.pb_type();
+  state_ = FileState::OPEN;
+  return Status::OK();
+}
+
+Status ReadablePBContainerFile::ReadNextPB(Message* msg) {
+  DCHECK_EQ(FileState::OPEN, state_);
+  return ReadFullPB(reader_.get(), version_, &cached_file_size_, &offset_, msg);
+}
+
+Status ReadablePBContainerFile::GetPrototype(const Message** prototype) {
+  if (!prototype_) {
+    // Loading the schemas into a DescriptorDatabase (and not directly into
+    // a DescriptorPool) defers resolution until FindMessageTypeByName()
+    // below, allowing for schemas to be loaded in any order.
+    unique_ptr<SimpleDescriptorDatabase> db(new SimpleDescriptorDatabase());
+    for (int i = 0; i < protos()->file_size(); i++) {
+      if (!db->Add(protos()->file(i))) {
+        return Status::Corruption("Descriptor not loaded", Substitute(
+            "Could not load descriptor for PB type $0 referenced in container file",
+            pb_type()));
+      }
+    }
+    unique_ptr<DescriptorPool> pool(new DescriptorPool(db.get()));
+    const Descriptor* desc = pool->FindMessageTypeByName(pb_type());
+    if (!desc) {
+      return Status::NotFound("Descriptor not found", Substitute(
+          "Could not find descriptor for PB type $0 referenced in container file",
+          pb_type()));
+    }
+
+    unique_ptr<DynamicMessageFactory> factory(new DynamicMessageFactory());
+    const Message* p = factory->GetPrototype(desc);
+    if (!p) {
+      return Status::NotSupported("Descriptor not supported", Substitute(
+          "Descriptor $0 referenced in container file not supported",
+          pb_type()));
+    }
+
+    db_ = std::move(db);
+    descriptor_pool_ = std::move(pool);
+    message_factory_ = std::move(factory);
+    prototype_ = p;
+  }
+  *prototype = prototype_;
+  return Status::OK();
+}
+
+Status ReadablePBContainerFile::Dump(ostream* os, ReadablePBContainerFile::Format format) {
+  DCHECK_EQ(FileState::OPEN, state_);
+
+  // Since we use the protobuf library support for dumping JSON, there isn't any easy
+  // way to hook in our redaction support. Since this is only used by CLI tools,
+  // just refuse to dump JSON if redaction is enabled.
+  if (format == Format::JSON && KUDU_SHOULD_REDACT()) {
+    return Status::NotSupported("cannot dump PBC file in JSON format if redaction is enabled");
+  }
+
+  const char* const kDashes = "-------";
+
+  if (format == Format::DEBUG) {
+    *os << "File header" << endl;
+    *os << kDashes << endl;
+    *os << "Protobuf container version: " << version_ << endl;
+    *os << "Total container file size: " << *cached_file_size_ << endl;
+    *os << "Entry PB type: " << pb_type_ << endl;
+    *os << endl;
+  }
+
+  // Use the embedded protobuf information from the container file to
+  // create the appropriate kind of protobuf Message.
+  const Message* prototype;
+  RETURN_NOT_OK(GetPrototype(&prototype));
+  unique_ptr<Message> msg(prototype_->New());
+
+  // Dump each message in the container file.
+  int count = 0;
+  uint64_t prev_offset = offset_;
+  Status s;
+  string buf;
+  for (s = ReadNextPB(msg.get());
+      s.ok();
+      s = ReadNextPB(msg.get())) {
+    switch (format) {
+      case Format::ONELINE:
+        *os << count << "\t" << SecureShortDebugString(*msg) << endl;
+        break;
+      case Format::DEFAULT:
+      case Format::DEBUG:
+        *os << "Message " << count << endl;
+        if (format == Format::DEBUG) {
+          *os << "offset: " << prev_offset << endl;
+          *os << "length: " << (offset_ - prev_offset) << endl;
+        }
+        *os << kDashes << endl;
+        *os << SecureDebugString(*msg) << endl;
+        break;
+      case Format::JSON:
+        buf.clear();
+        const auto& google_status = google::protobuf::util::MessageToJsonString(
+            *msg, &buf, google::protobuf::util::JsonPrintOptions());
+        if (!google_status.ok()) {
+          return Status::RuntimeError("could not convert PB to JSON", google_status.ToString());
+        }
+        *os << buf << endl;
+        break;
+    }
+
+    prev_offset = offset_;
+    count++;
+  }
+  if (format == Format::DEBUG && !s.IsEndOfFile()) {
+    *os << "Message " << count << endl;
+    *os << "error: failed to parse protobuf message" << endl;
+    *os << "offset: " << prev_offset << endl;
+    *os << "remaining file length: " << (*cached_file_size_ - prev_offset) << endl;
+    *os << kDashes << endl;
+  }
+  return s.IsEndOfFile() ? Status::OK() : s;
+}
+
+Status ReadablePBContainerFile::Close() {
+  state_ = FileState::CLOSED;
+  reader_.reset();
+  return Status::OK();
+}
+
+int ReadablePBContainerFile::version() const {
+  DCHECK_EQ(FileState::OPEN, state_);
+  return version_;
+}
+
+uint64_t ReadablePBContainerFile::offset() const {
+  DCHECK_EQ(FileState::OPEN, state_);
+  return offset_;
+}
+
+Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) {
+  unique_ptr<RandomAccessFile> file;
+  RETURN_NOT_OK(env->NewRandomAccessFile(path, &file));
+
+  ReadablePBContainerFile pb_file(std::move(file));
+  RETURN_NOT_OK(pb_file.Open());
+  RETURN_NOT_OK(pb_file.ReadNextPB(msg));
+  return pb_file.Close();
+}
+
+Status WritePBContainerToPath(Env* env, const std::string& path,
+                              const Message& msg,
+                              CreateMode create,
+                              SyncMode sync) {
+  TRACE_EVENT2("io", "WritePBContainerToPath",
+               "path", path,
+               "msg_type", msg.GetTypeName());
+
+  if (create == NO_OVERWRITE && env->FileExists(path)) {
+    return Status::AlreadyPresent(Substitute("File $0 already exists", path));
+  }
+
+  const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix;
+  string tmp_path;
+
+  unique_ptr<RWFile> file;
+  RETURN_NOT_OK(env->NewTempRWFile(RWFileOptions(), tmp_template, &tmp_path, &file));
+  auto tmp_deleter = MakeScopedCleanup([&]() {
+    WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + tmp_path);
+  });
+
+  WritablePBContainerFile pb_file(std::move(file));
+  RETURN_NOT_OK(pb_file.CreateNew(msg));
+  RETURN_NOT_OK(pb_file.Append(msg));
+  if (sync == pb_util::SYNC) {
+    RETURN_NOT_OK(pb_file.Sync());
+  }
+  RETURN_NOT_OK(pb_file.Close());
+  RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path),
+                        "Failed to rename tmp file to " + path);
+  tmp_deleter.cancel();
+  if (sync == pb_util::SYNC) {
+    RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)),
+                          "Failed to SyncDir() parent of " + path);
+  }
+  return Status::OK();
+}
+
+
+scoped_refptr<debug::ConvertableToTraceFormat> PbTracer::TracePb(const Message& msg) {
+  return make_scoped_refptr(new PbTracer(msg));
+}
+
+PbTracer::PbTracer(const Message& msg) : msg_(msg.New()) {
+  msg_->CopyFrom(msg);
+}
+
+void PbTracer::AppendAsTraceFormat(std::string* out) const {
+  pb_util::TruncateFields(msg_.get(), kMaxFieldLengthToTrace);
+  std::ostringstream ss;
+  JsonWriter jw(&ss, JsonWriter::COMPACT);
+  jw.Protobuf(*msg_);
+  out->append(ss.str());
+}
+
+} // namespace pb_util
+} // namespace kudu