You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:22 UTC
[10/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/os-util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/os-util.h b/be/src/kudu/util/os-util.h
new file mode 100644
index 0000000..7e1bbb6
--- /dev/null
+++ b/be/src/kudu/util/os-util.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Imported from Impala. Changes include:
+// - Namespace + imports.
+// - Fixes for cpplint.
+// - Fixed parsing when thread names have spaces.
+
+#ifndef KUDU_UTIL_OS_UTIL_H
+#define KUDU_UTIL_OS_UTIL_H
+
+#include <cstdint>
+#include <string>
+#include <type_traits> // IWYU pragma: keep
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+// Utility methods to read interesting values from /proc.
+// TODO: Get stats for parent process.
+
+// Container struct for statistics read from the /proc filesystem for a thread.
+struct ThreadStats {
+ int64_t user_ns;
+ int64_t kernel_ns;
+ int64_t iowait_ns;
+
+ // Default constructor zeroes all members in case structure can't be filled by
+ // GetThreadStats.
+ ThreadStats() : user_ns(0), kernel_ns(0), iowait_ns(0) { }
+};
+
+// Populates ThreadStats object using a given buffer. The buffer is expected to
+// conform to /proc/<pid>/task/<tid>/stat layout; an error will be returned otherwise.
+//
+// If 'name' is supplied, the extracted thread name will be written to it.
+Status ParseStat(const std::string&buffer, std::string* name, ThreadStats* stats);
+
+// Populates ThreadStats object for a given thread by reading from
+// /proc/<pid>/task/<tid>/stat. Returns OK unless the file cannot be read or is in an
+// unrecognised format, or if the kernel version is not modern enough.
+Status GetThreadStats(int64_t tid, ThreadStats* stats);
+
+// Disable core dumps for this process.
+//
+// This is useful particularly in tests where we have injected failures and don't
+// want to generate a core dump from an "expected" crash.
+void DisableCoreDumps();
+
+// Return true if this process appears to be running under a debugger or strace.
+//
+// This may return false on unsupported (non-Linux) platforms.
+bool IsBeingDebugged();
+
+} // namespace kudu
+
+#endif /* KUDU_UTIL_OS_UTIL_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/path_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/path_util-test.cc b/be/src/kudu/util/path_util-test.cc
new file mode 100644
index 0000000..0d617fc
--- /dev/null
+++ b/be/src/kudu/util/path_util-test.cc
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/path_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+TEST(TestPathUtil, BaseNameTest) {
+ ASSERT_EQ(".", BaseName(""));
+ ASSERT_EQ(".", BaseName("."));
+ ASSERT_EQ("..", BaseName(".."));
+ ASSERT_EQ("/", BaseName("/"));
+ ASSERT_EQ("/", BaseName("//"));
+ ASSERT_EQ("a", BaseName("a"));
+ ASSERT_EQ("ab", BaseName("ab"));
+ ASSERT_EQ("ab", BaseName("ab/"));
+ ASSERT_EQ("cd", BaseName("ab/cd"));
+ ASSERT_EQ("ab", BaseName("/ab"));
+ ASSERT_EQ("ab", BaseName("/ab///"));
+ ASSERT_EQ("cd", BaseName("/ab/cd"));
+}
+
+TEST(TestPathUtil, DirNameTest) {
+ ASSERT_EQ(".", DirName(""));
+ ASSERT_EQ(".", DirName("."));
+ ASSERT_EQ(".", DirName(".."));
+ ASSERT_EQ("/", DirName("/"));
+#if defined(__linux__)
+ // On OS X this test case returns "/", while Linux returns "//". On both
+ // platforms dirname(1) returns "/". The difference is unlikely to matter in
+ // practice.
+ ASSERT_EQ("//", DirName("//"));
+#else
+ ASSERT_EQ("/", DirName("//"));
+#endif // defined(__linux__)
+ ASSERT_EQ(".", DirName("a"));
+ ASSERT_EQ(".", DirName("ab"));
+ ASSERT_EQ(".", DirName("ab/"));
+ ASSERT_EQ("ab", DirName("ab/cd"));
+ ASSERT_EQ("/", DirName("/ab"));
+ ASSERT_EQ("/", DirName("/ab///"));
+ ASSERT_EQ("/ab", DirName("/ab/cd"));
+}
+
+TEST(TestPathUtil, SplitPathTest) {
+ typedef vector<string> Vec;
+ ASSERT_EQ(Vec({"/"}), SplitPath("/"));
+ ASSERT_EQ(Vec({"/", "a", "b"}), SplitPath("/a/b"));
+ ASSERT_EQ(Vec({"/", "a", "b"}), SplitPath("/a/b/"));
+ ASSERT_EQ(Vec({"a", "b"}), SplitPath("a/b"));
+ ASSERT_EQ(Vec({"."}), SplitPath("."));
+ ASSERT_EQ(Vec(), SplitPath(""));
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/path_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/path_util.cc b/be/src/kudu/util/path_util.cc
new file mode 100644
index 0000000..6d1c4a5
--- /dev/null
+++ b/be/src/kudu/util/path_util.cc
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/path_util.h"
+
+// Use the POSIX version of dirname(3).
+#include <libgen.h>
+
+#include <cstring>
+#if defined(__APPLE__)
+#include <mutex>
+#endif // defined(__APPLE__)
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/util/env.h"
+#include "kudu/util/status.h"
+#include "kudu/util/subprocess.h"
+
+
+using std::string;
+using std::vector;
+using strings::SkipEmpty;
+using strings::Split;
+
+namespace kudu {
+
+const char kTmpInfix[] = ".kudutmp";
+const char kOldTmpInfix[] = ".tmp";
+
+string JoinPathSegments(const string& a, const string& b) {
+ CHECK(!a.empty()) << "empty first component: " << a;
+ CHECK(!b.empty() && b[0] != '/')
+ << "second path component must be non-empty and relative: "
+ << b;
+ if (a[a.size() - 1] == '/') {
+ return a + b;
+ } else {
+ return a + "/" + b;
+ }
+}
+
+vector<string> JoinPathSegmentsV(const vector<string>& v, const string& s) {
+ vector<string> out;
+ for (const string& path : v) {
+ out.emplace_back(JoinPathSegments(path, s));
+ }
+ return out;
+}
+
+vector<string> SplitPath(const string& path) {
+ if (path.empty()) return {};
+ vector<string> segments;
+ if (path[0] == '/') segments.emplace_back("/");
+ vector<StringPiece> pieces = Split(path, "/", SkipEmpty());
+ for (const StringPiece& piece : pieces) {
+ segments.emplace_back(piece.data(), piece.size());
+ }
+ return segments;
+}
+
+string DirName(const string& path) {
+ gscoped_ptr<char[], FreeDeleter> path_copy(strdup(path.c_str()));
+#if defined(__APPLE__)
+ static std::mutex lock;
+ std::lock_guard<std::mutex> l(lock);
+#endif // defined(__APPLE__)
+ return ::dirname(path_copy.get());
+}
+
+string BaseName(const string& path) {
+ gscoped_ptr<char[], FreeDeleter> path_copy(strdup(path.c_str()));
+ return basename(path_copy.get());
+}
+
+Status FindExecutable(const string& binary,
+ const vector<string>& search,
+ string* path) {
+ string p;
+
+ // First, check specified locations. This is necessary to check first so that
+ // the system binaries won't be found before the specified search locations.
+ for (const auto& location : search) {
+ p = JoinPathSegments(location, binary);
+ if (Env::Default()->FileExists(p)) {
+ *path = p;
+ return Status::OK();
+ }
+ }
+
+ // Next check if the binary is on the PATH.
+ Status s = Subprocess::Call({ "which", binary }, "", &p);
+ if (s.ok()) {
+ StripTrailingNewline(&p);
+ *path = p;
+ return Status::OK();
+ }
+
+ return Status::NotFound("Unable to find binary", binary);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/path_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/path_util.h b/be/src/kudu/util/path_util.h
new file mode 100644
index 0000000..58211a9
--- /dev/null
+++ b/be/src/kudu/util/path_util.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Utility methods for dealing with file paths.
+#ifndef KUDU_UTIL_PATH_UTIL_H
+#define KUDU_UTIL_PATH_UTIL_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/port.h"
+
+namespace kudu {
+
+class Status;
+
+// Common tmp infix
+extern const char kTmpInfix[];
+// Infix from versions of Kudu prior to 1.2.
+extern const char kOldTmpInfix[];
+
+// Join two path segments with the appropriate path separator,
+// if necessary.
+std::string JoinPathSegments(const std::string& a,
+ const std::string& b);
+
+// Join each path segment in a list with a common suffix segment.
+std::vector<std::string> JoinPathSegmentsV(const std::vector<std::string>& v,
+ const std::string& s);
+
+// Split a path into segments with the appropriate path separator.
+std::vector<std::string> SplitPath(const std::string& path);
+
+// Return the enclosing directory of path.
+// This is like dirname(3) but for C++ strings.
+std::string DirName(const std::string& path);
+
+// Return the terminal component of a path.
+// This is like basename(3) but for C++ strings.
+std::string BaseName(const std::string& path);
+
+// Attempts to find the path to the executable, searching the provided locations
+// as well as the $PATH environment variable.
+Status FindExecutable(const std::string& binary,
+ const std::vector<std::string>& search,
+ std::string* path) WARN_UNUSED_RESULT;
+
+} // namespace kudu
+#endif /* KUDU_UTIL_PATH_UTIL_H */
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util-internal.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util-internal.cc b/be/src/kudu/util/pb_util-internal.cc
new file mode 100644
index 0000000..380072c
--- /dev/null
+++ b/be/src/kudu/util/pb_util-internal.cc
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/pb_util-internal.h"
+
+#include <ostream>
+#include <string>
+
+namespace kudu {
+namespace pb_util {
+namespace internal {
+
+////////////////////////////////////////////
+// SequentialFileFileInputStream
+////////////////////////////////////////////
+
+bool SequentialFileFileInputStream::Next(const void **data, int *size) {
+ if (PREDICT_FALSE(!status_.ok())) {
+ LOG(WARNING) << "Already failed on a previous read: " << status_.ToString();
+ return false;
+ }
+
+ size_t available = (buffer_used_ - buffer_offset_);
+ if (available > 0) {
+ *data = buffer_.get() + buffer_offset_;
+ *size = available;
+ buffer_offset_ += available;
+ total_read_ += available;
+ return true;
+ }
+
+ Slice result(buffer_.get(), buffer_size_);
+ status_ = rfile_->Read(&result);
+ if (!status_.ok()) {
+ LOG(WARNING) << "Read at " << buffer_offset_ << " failed: " << status_.ToString();
+ return false;
+ }
+
+ buffer_used_ = result.size();
+ buffer_offset_ = buffer_used_;
+ total_read_ += buffer_used_;
+ *data = buffer_.get();
+ *size = buffer_used_;
+ return buffer_used_ > 0;
+}
+
+bool SequentialFileFileInputStream::Skip(int count) {
+ CHECK_GT(count, 0);
+ int avail = (buffer_used_ - buffer_offset_);
+ if (avail > count) {
+ buffer_offset_ += count;
+ total_read_ += count;
+ } else {
+ buffer_used_ = 0;
+ buffer_offset_ = 0;
+ status_ = rfile_->Skip(count - avail);
+ total_read_ += count - avail;
+ }
+ return status_.ok();
+}
+
+////////////////////////////////////////////
+// WritableFileOutputStream
+////////////////////////////////////////////
+
+bool WritableFileOutputStream::Next(void **data, int *size) {
+ if (PREDICT_FALSE(!status_.ok())) {
+ LOG(WARNING) << "Already failed on a previous write: " << status_.ToString();
+ return false;
+ }
+
+ size_t available = (buffer_size_ - buffer_offset_);
+ if (available > 0) {
+ *data = buffer_.get() + buffer_offset_;
+ *size = available;
+ buffer_offset_ += available;
+ return true;
+ }
+
+ if (!Flush()) {
+ return false;
+ }
+
+ buffer_offset_ = buffer_size_;
+ *data = buffer_.get();
+ *size = buffer_size_;
+ return true;
+}
+
+} // namespace internal
+} // namespace pb_util
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util-internal.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util-internal.h b/be/src/kudu/util/pb_util-internal.h
new file mode 100644
index 0000000..48a501d
--- /dev/null
+++ b/be/src/kudu/util/pb_util-internal.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Classes used internally by pb_util.h.
+// This header should not be included by anything but pb_util and its tests.
+#ifndef KUDU_UTIL_PB_UTIL_INTERNAL_H
+#define KUDU_UTIL_PB_UTIL_INTERNAL_H
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+
+#include <glog/logging.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/env.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace pb_util {
+namespace internal {
+
+// Input Stream used by ParseFromSequentialFile()
+class SequentialFileFileInputStream : public google::protobuf::io::ZeroCopyInputStream {
+ public:
+ explicit SequentialFileFileInputStream(SequentialFile *rfile,
+ size_t buffer_size = kDefaultBufferSize)
+ : buffer_used_(0), buffer_offset_(0),
+ buffer_size_(buffer_size), buffer_(new uint8[buffer_size_]),
+ total_read_(0), rfile_(rfile) {
+ CHECK_GT(buffer_size, 0);
+ }
+
+ ~SequentialFileFileInputStream() {
+ }
+
+ bool Next(const void **data, int *size) OVERRIDE;
+ bool Skip(int count) OVERRIDE;
+
+ void BackUp(int count) OVERRIDE {
+ CHECK_GE(count, 0);
+ CHECK_LE(count, buffer_offset_);
+ buffer_offset_ -= count;
+ total_read_ -= count;
+ }
+
+ int64_t ByteCount() const OVERRIDE {
+ return total_read_;
+ }
+
+ Status status() const {
+ return status_;
+ }
+
+ private:
+ static const size_t kDefaultBufferSize = 8192;
+
+ Status status_;
+
+ size_t buffer_used_;
+ size_t buffer_offset_;
+ const size_t buffer_size_;
+ std::unique_ptr<uint8_t[]> buffer_;
+
+ size_t total_read_;
+ SequentialFile *rfile_;
+};
+
+// Output Stream used by SerializeToWritableFile()
+class WritableFileOutputStream : public google::protobuf::io::ZeroCopyOutputStream {
+ public:
+ explicit WritableFileOutputStream(WritableFile *wfile, size_t buffer_size = kDefaultBufferSize)
+ : buffer_offset_(0), buffer_size_(buffer_size), buffer_(new uint8[buffer_size_]),
+ flushed_(0), wfile_(wfile) {
+ CHECK_GT(buffer_size, 0);
+ }
+
+ ~WritableFileOutputStream() {
+ }
+
+ bool Flush() {
+ if (buffer_offset_ > 0) {
+ Slice data(buffer_.get(), buffer_offset_);
+ status_ = wfile_->Append(data);
+ flushed_ += buffer_offset_;
+ buffer_offset_ = 0;
+ }
+ return status_.ok();
+ }
+
+ bool Next(void **data, int *size) OVERRIDE;
+
+ void BackUp(int count) OVERRIDE {
+ CHECK_GE(count, 0);
+ CHECK_LE(count, buffer_offset_);
+ buffer_offset_ -= count;
+ }
+
+ int64_t ByteCount() const OVERRIDE {
+ return flushed_ + buffer_offset_;
+ }
+
+ private:
+ static const size_t kDefaultBufferSize = 8192;
+
+ Status status_;
+
+ size_t buffer_offset_;
+ const size_t buffer_size_;
+ std::unique_ptr<uint8_t[]> buffer_;
+
+ size_t flushed_;
+ WritableFile *wfile_;
+};
+
+} // namespace internal
+} // namespace pb_util
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util-test.cc b/be/src/kudu/util/pb_util-test.cc
new file mode 100644
index 0000000..a942e9a
--- /dev/null
+++ b/be/src/kudu/util/pb_util-test.cc
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/pb_util-internal.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/pb_util_test.pb.h"
+#include "kudu/util/proto_container_test.pb.h"
+#include "kudu/util/proto_container_test2.pb.h"
+#include "kudu/util/proto_container_test3.pb.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+namespace pb_util {
+
+using google::protobuf::FileDescriptorSet;
+using internal::WritableFileOutputStream;
+using std::ostringstream;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+static const char* kTestFileName = "pb_container.meta";
+static const char* kTestKeyvalName = "my-key";
+static const int kTestKeyvalValue = 1;
+static const int kUseDefaultVersion = 0; // Use the default container version (don't set it).
+
+class TestPBUtil : public KuduTest {
+ public:
+ virtual void SetUp() OVERRIDE {
+ KuduTest::SetUp();
+ path_ = GetTestPath(kTestFileName);
+ }
+
+ protected:
+ // Create a container file with expected values.
+ // Since this is a unit test class, and we want it to be fast, we do not
+ // fsync by default.
+ Status CreateKnownGoodContainerFile(CreateMode create = OVERWRITE,
+ SyncMode sync = NO_SYNC);
+
+ // Create a new Protobuf Container File Writer.
+ // Set version to kUseDefaultVersion to use the default version.
+ Status NewPBCWriter(int version, RWFileOptions opts,
+ unique_ptr<WritablePBContainerFile>* pb_writer);
+
+ // Same as CreateKnownGoodContainerFile(), but with settable file version.
+ // Set version to kUseDefaultVersion to use the default version.
+ Status CreateKnownGoodContainerFileWithVersion(int version,
+ CreateMode create = OVERWRITE,
+ SyncMode sync = NO_SYNC);
+
+ // XORs the data in the specified range of the file at the given path.
+ Status BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t length);
+
+ void DumpPBCToString(const string& path, ReadablePBContainerFile::Format format, string* ret);
+
+ // Truncate the specified file to the specified length.
+ Status TruncateFile(const string& path, uint64_t size);
+
+ // Output file name for most unit tests.
+ string path_;
+};
+
+// Parameterized test class for running tests across various versions of PB
+// container files.
+class TestPBContainerVersions : public TestPBUtil,
+ public ::testing::WithParamInterface<int> {
+ public:
+ TestPBContainerVersions()
+ : version_(GetParam()) {
+ }
+
+ protected:
+ const int version_; // The parameterized container version we are testing.
+};
+
+INSTANTIATE_TEST_CASE_P(SupportedVersions, TestPBContainerVersions,
+ ::testing::Values(1, 2, kUseDefaultVersion));
+
+Status TestPBUtil::CreateKnownGoodContainerFile(CreateMode create, SyncMode sync) {
+ ProtoContainerTestPB test_pb;
+ test_pb.set_name(kTestKeyvalName);
+ test_pb.set_value(kTestKeyvalValue);
+ return WritePBContainerToPath(env_, path_, test_pb, create, sync);
+}
+
+Status TestPBUtil::NewPBCWriter(int version, RWFileOptions opts,
+ unique_ptr<WritablePBContainerFile>* pb_writer) {
+ unique_ptr<RWFile> writer;
+ RETURN_NOT_OK(env_->NewRWFile(opts, path_, &writer));
+ pb_writer->reset(new WritablePBContainerFile(std::move(writer)));
+ if (version != kUseDefaultVersion) {
+ (*pb_writer)->SetVersionForTests(version);
+ }
+ return Status::OK();
+}
+
+Status TestPBUtil::CreateKnownGoodContainerFileWithVersion(int version,
+ CreateMode create,
+ SyncMode sync) {
+ ProtoContainerTestPB test_pb;
+ test_pb.set_name(kTestKeyvalName);
+ test_pb.set_value(kTestKeyvalValue);
+
+ unique_ptr<WritablePBContainerFile> pb_writer;
+ RETURN_NOT_OK(NewPBCWriter(version, RWFileOptions(), &pb_writer));
+ RETURN_NOT_OK(pb_writer->CreateNew(test_pb));
+ RETURN_NOT_OK(pb_writer->Append(test_pb));
+ RETURN_NOT_OK(pb_writer->Close());
+ return Status::OK();
+}
+
+Status TestPBUtil::BitFlipFileByteRange(const string& path, uint64_t offset, uint64_t length) {
+ faststring buf;
+ // Read the data from disk.
+ {
+ unique_ptr<RandomAccessFile> file;
+ RETURN_NOT_OK(env_->NewRandomAccessFile(path, &file));
+ uint64_t size;
+ RETURN_NOT_OK(file->Size(&size));
+ faststring scratch;
+ scratch.resize(size);
+ Slice slice(scratch.data(), size);
+ RETURN_NOT_OK(file->Read(0, slice));
+ buf.append(slice.data(), slice.size());
+ }
+
+ // Flip the bits.
+ for (uint64_t i = 0; i < length; i++) {
+ uint8_t* addr = buf.data() + offset + i;
+ *addr = ~*addr;
+ }
+
+ // Write the data back to disk.
+ unique_ptr<WritableFile> file;
+ RETURN_NOT_OK(env_->NewWritableFile(path, &file));
+ RETURN_NOT_OK(file->Append(buf));
+ RETURN_NOT_OK(file->Close());
+
+ return Status::OK();
+}
+
+Status TestPBUtil::TruncateFile(const string& path, uint64_t size) {
+ unique_ptr<RWFile> file;
+ RWFileOptions opts;
+ opts.mode = Env::OPEN_EXISTING;
+ RETURN_NOT_OK(env_->NewRWFile(opts, path, &file));
+ RETURN_NOT_OK(file->Truncate(size));
+ return Status::OK();
+}
+
+TEST_F(TestPBUtil, TestWritableFileOutputStream) {
+ shared_ptr<WritableFile> file;
+ string path = GetTestPath("test.out");
+ ASSERT_OK(env_util::OpenFileForWrite(env_, path, &file));
+
+ WritableFileOutputStream stream(file.get(), 4096);
+
+ void* buf;
+ int size;
+
+ // First call should yield the whole buffer.
+ ASSERT_TRUE(stream.Next(&buf, &size));
+ ASSERT_EQ(4096, size);
+ ASSERT_EQ(4096, stream.ByteCount());
+
+ // Backup 1000 and the next call should yield 1000
+ stream.BackUp(1000);
+ ASSERT_EQ(3096, stream.ByteCount());
+
+ ASSERT_TRUE(stream.Next(&buf, &size));
+ ASSERT_EQ(1000, size);
+
+ // Another call should flush and yield a new buffer of 4096
+ ASSERT_TRUE(stream.Next(&buf, &size));
+ ASSERT_EQ(4096, size);
+ ASSERT_EQ(8192, stream.ByteCount());
+
+ // Should be able to backup to 7192
+ stream.BackUp(1000);
+ ASSERT_EQ(7192, stream.ByteCount());
+
+ // Flushing shouldn't change written count.
+ ASSERT_TRUE(stream.Flush());
+ ASSERT_EQ(7192, stream.ByteCount());
+
+ // Since we just flushed, we should get another full buffer.
+ ASSERT_TRUE(stream.Next(&buf, &size));
+ ASSERT_EQ(4096, size);
+ ASSERT_EQ(7192 + 4096, stream.ByteCount());
+
+ ASSERT_TRUE(stream.Flush());
+
+ ASSERT_EQ(stream.ByteCount(), file->Size());
+}
+
+// Basic read/write test.
+TEST_F(TestPBUtil, TestPBContainerSimple) {
+ // Exercise both the SYNC and NO_SYNC codepaths, despite the fact that we
+ // aren't able to observe a difference in the test.
+ vector<SyncMode> modes = { SYNC, NO_SYNC };
+ for (SyncMode mode : modes) {
+
+ // Write the file.
+ ASSERT_OK(CreateKnownGoodContainerFile(NO_OVERWRITE, mode));
+
+ // Read it back, should validate and contain the expected values.
+ ProtoContainerTestPB test_pb;
+ ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb));
+ ASSERT_EQ(kTestKeyvalName, test_pb.name());
+ ASSERT_EQ(kTestKeyvalValue, test_pb.value());
+
+ // Delete the file.
+ ASSERT_OK(env_->DeleteFile(path_));
+ }
+}
+
+// Corruption / various failure mode test.
+TEST_P(TestPBContainerVersions, TestCorruption) {
+ // Test that we indicate when the file does not exist.
+ ProtoContainerTestPB test_pb;
+ Status s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ ASSERT_TRUE(s.IsNotFound()) << "Should not be found: " << path_ << ": " << s.ToString();
+
+ // Test that an empty file looks like corruption.
+ {
+ // Create the empty file.
+ unique_ptr<WritableFile> file;
+ ASSERT_OK(env_->NewWritableFile(path_, &file));
+ ASSERT_OK(file->Close());
+ }
+ s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ ASSERT_TRUE(s.IsIncomplete()) << "Should be zero length: " << path_ << ": " << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+
+ // Test truncated file.
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ uint64_t known_good_size = 0;
+ ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+ ASSERT_OK(TruncateFile(path_, known_good_size - 2));
+ s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ if (version_ == 1) {
+ ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect size: " << path_ << ": " << s.ToString();
+ } else {
+ ASSERT_TRUE(s.IsIncomplete()) << "Should be incorrect size: " << path_ << ": " << s.ToString();
+ }
+ ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+
+ // Test corrupted magic.
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ ASSERT_OK(BitFlipFileByteRange(path_, 0, 2));
+ s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ ASSERT_TRUE(s.IsCorruption()) << "Should have invalid magic: " << path_ << ": " << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Invalid magic number");
+
+ // Test corrupted version.
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ ASSERT_OK(BitFlipFileByteRange(path_, 8, 2));
+ s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ ASSERT_TRUE(s.IsNotSupported()) << "Should have unsupported version number: " << path_ << ": "
+ << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), " Protobuf container has unsupported version");
+
+ // Test corrupted magic+version checksum (only exists in the V2+ format).
+ if (version_ >= 2) {
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ ASSERT_OK(BitFlipFileByteRange(path_, 12, 2));
+ s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ ASSERT_TRUE(s.IsCorruption()) << "Should have corrupted file header checksum: " << path_ << ": "
+ << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "File header checksum does not match");
+ }
+
+ // Test record corruption below.
+ const int kFirstRecordOffset = (version_ == 1) ? 12 : 16;
+
+ // Test corrupted data length.
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset, 2));
+ s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ if (version_ == 1) {
+ ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+ } else {
+ ASSERT_TRUE(s.IsCorruption()) << "Should be invalid data length checksum: "
+ << path_ << ": " << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+ }
+
+ // Test corrupted data (looks like bad checksum).
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ ASSERT_OK(BitFlipFileByteRange(path_, kFirstRecordOffset + 4, 2));
+ s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ << ": "
+ << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+
+ // Test corrupted checksum.
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ ASSERT_OK(BitFlipFileByteRange(path_, known_good_size - 4, 2));
+ s = ReadPBContainerFromPath(env_, path_, &test_pb);
+ ASSERT_TRUE(s.IsCorruption()) << "Should be incorrect checksum: " << path_ << ": "
+ << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+}
+
+// Test partial record at end of file.
+TEST_P(TestPBContainerVersions, TestPartialRecord) {
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ uint64_t known_good_size;
+ ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+ ASSERT_OK(TruncateFile(path_, known_good_size - 2));
+
+ unique_ptr<RandomAccessFile> file;
+ ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+ ReadablePBContainerFile pb_file(std::move(file));
+ ASSERT_OK(pb_file.Open());
+ ProtoContainerTestPB test_pb;
+ Status s = pb_file.ReadNextPB(&test_pb);
+ // Loop to verify that the same response is repeatably returned.
+ for (int i = 0; i < 2; i++) {
+ if (version_ == 1) {
+ ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+ } else {
+ ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+ }
+ ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+ }
+ ASSERT_OK(pb_file.Close());
+}
+
+// KUDU-2260: Test handling extra null bytes at the end of file. This can
+// occur, for example, on ext4 in default data=ordered mode when a write
+// increases the filesize but the system crashes before the actual data is
+// persisted.
+TEST_P(TestPBContainerVersions, TestExtraNullBytes) {
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ uint64_t known_good_size;
+ ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+ for (const auto extra_bytes : {1, 8, 128}) {
+ ASSERT_OK(TruncateFile(path_, known_good_size + extra_bytes));
+
+ unique_ptr<RandomAccessFile> file;
+ ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+ ReadablePBContainerFile pb_file(std::move(file));
+ ASSERT_OK(pb_file.Open());
+ ProtoContainerTestPB test_pb;
+ // Read the first good PB. Trouble starts at the second.
+ ASSERT_OK(pb_file.ReadNextPB(&test_pb));
+ Status s = pb_file.ReadNextPB(&test_pb);
+ // Loop to verify that the same response is repeatably returned.
+ for (int i = 0; i < 2; i++) {
+ ASSERT_TRUE(version_ == 1 ? s.IsCorruption() : s.IsIncomplete()) << s.ToString();
+ if (extra_bytes < 8) {
+ ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+ } else if (version_ == 1) {
+ ASSERT_STR_CONTAINS(s.ToString(), "Length and data checksum does not match");
+ } else {
+ ASSERT_STR_CONTAINS(s.ToString(), "rest of file is NULL bytes");
+ }
+ }
+ ASSERT_OK(pb_file.Close());
+ }
+}
+
+// Test that it is possible to append after a partial write if we truncate the
+// partial record. This is only fully supported in V2+.
+TEST_P(TestPBContainerVersions, TestAppendAfterPartialWrite) {
+ uint64_t known_good_size;
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ ASSERT_OK(env_->GetFileSize(path_, &known_good_size));
+
+ unique_ptr<WritablePBContainerFile> writer;
+ RWFileOptions opts;
+ opts.mode = Env::OPEN_EXISTING;
+ ASSERT_OK(NewPBCWriter(version_, opts, &writer));
+ ASSERT_OK(writer->OpenExisting());
+
+ ASSERT_OK(TruncateFile(path_, known_good_size - 2));
+
+ unique_ptr<RandomAccessFile> file;
+ ASSERT_OK(env_->NewRandomAccessFile(path_, &file));
+ ReadablePBContainerFile reader(std::move(file));
+ ASSERT_OK(reader.Open());
+ ProtoContainerTestPB test_pb;
+ Status s = reader.ReadNextPB(&test_pb);
+ ASSERT_STR_CONTAINS(s.ToString(), "File size not large enough to be valid");
+ if (version_ == 1) {
+ ASSERT_TRUE(s.IsCorruption()) << s.ToString();
+ return; // The rest of the test does not apply to version 1.
+ }
+ ASSERT_TRUE(s.IsIncomplete()) << s.ToString();
+
+ // Now truncate cleanly.
+ ASSERT_OK(TruncateFile(path_, reader.offset()));
+ s = reader.ReadNextPB(&test_pb);
+ ASSERT_TRUE(s.IsEndOfFile()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Reached end of file");
+
+ // Reopen the writer to allow appending more records.
+ // Append a record and read it back.
+ ASSERT_OK(NewPBCWriter(version_, opts, &writer));
+ ASSERT_OK(writer->OpenExisting());
+ test_pb.set_name("hello");
+ test_pb.set_value(1);
+ ASSERT_OK(writer->Append(test_pb));
+ test_pb.Clear();
+ ASSERT_OK(reader.ReadNextPB(&test_pb));
+ ASSERT_EQ("hello", test_pb.name());
+ ASSERT_EQ(1, test_pb.value());
+}
+
+// Simple test for all versions.
+TEST_P(TestPBContainerVersions, TestSingleMessage) {
+ ASSERT_OK(CreateKnownGoodContainerFileWithVersion(version_));
+ ProtoContainerTestPB test_pb;
+ ASSERT_OK(ReadPBContainerFromPath(env_, path_, &test_pb));
+ ASSERT_EQ(kTestKeyvalName, test_pb.name());
+ ASSERT_EQ(kTestKeyvalValue, test_pb.value());
+}
+
+TEST_P(TestPBContainerVersions, TestMultipleMessages) {
+ ProtoContainerTestPB pb;
+ pb.set_name("foo");
+ pb.set_note("bar");
+
+ unique_ptr<WritablePBContainerFile> pb_writer;
+ ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
+ ASSERT_OK(pb_writer->CreateNew(pb));
+
+ for (int i = 0; i < 10; i++) {
+ pb.set_value(i);
+ ASSERT_OK(pb_writer->Append(pb));
+ }
+ ASSERT_OK(pb_writer->Close());
+
+ int pbs_read = 0;
+ unique_ptr<RandomAccessFile> reader;
+ ASSERT_OK(env_->NewRandomAccessFile(path_, &reader));
+ ReadablePBContainerFile pb_reader(std::move(reader));
+ ASSERT_OK(pb_reader.Open());
+ for (int i = 0;; i++) {
+ ProtoContainerTestPB read_pb;
+ Status s = pb_reader.ReadNextPB(&read_pb);
+ if (s.IsEndOfFile()) {
+ break;
+ }
+ ASSERT_OK(s);
+ ASSERT_EQ(pb.name(), read_pb.name());
+ ASSERT_EQ(read_pb.value(), i);
+ ASSERT_EQ(pb.note(), read_pb.note());
+ pbs_read++;
+ }
+ ASSERT_EQ(10, pbs_read);
+ ASSERT_OK(pb_reader.Close());
+}
+
+TEST_P(TestPBContainerVersions, TestInterleavedReadWrite) {
+ ProtoContainerTestPB pb;
+ pb.set_name("foo");
+ pb.set_note("bar");
+
+ // Open the file for writing and reading.
+ unique_ptr<WritablePBContainerFile> pb_writer;
+ ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
+ unique_ptr<RandomAccessFile> reader;
+ ASSERT_OK(env_->NewRandomAccessFile(path_, &reader));
+ ReadablePBContainerFile pb_reader(std::move(reader));
+
+ // Write the header (writer) and validate it (reader).
+ ASSERT_OK(pb_writer->CreateNew(pb));
+ ASSERT_OK(pb_reader.Open());
+
+ for (int i = 0; i < 10; i++) {
+ SCOPED_TRACE(i);
+ // Write a message and read it back.
+ pb.set_value(i);
+ ASSERT_OK(pb_writer->Append(pb));
+ ProtoContainerTestPB read_pb;
+ ASSERT_OK(pb_reader.ReadNextPB(&read_pb));
+ ASSERT_EQ(pb.name(), read_pb.name());
+ ASSERT_EQ(read_pb.value(), i);
+ ASSERT_EQ(pb.note(), read_pb.note());
+ }
+
+ // After closing the writer, the reader should be out of data.
+ ASSERT_OK(pb_writer->Close());
+ ASSERT_TRUE(pb_reader.ReadNextPB(nullptr).IsEndOfFile());
+ ASSERT_OK(pb_reader.Close());
+}
+
+TEST_F(TestPBUtil, TestPopulateDescriptorSet) {
+ {
+ // No dependencies --> just one proto.
+ ProtoContainerTestPB pb;
+ FileDescriptorSet protos;
+ WritablePBContainerFile::PopulateDescriptorSet(
+ pb.GetDescriptor()->file(), &protos);
+ ASSERT_EQ(1, protos.file_size());
+ }
+ {
+ // One direct dependency --> two protos.
+ ProtoContainerTest2PB pb;
+ FileDescriptorSet protos;
+ WritablePBContainerFile::PopulateDescriptorSet(
+ pb.GetDescriptor()->file(), &protos);
+ ASSERT_EQ(2, protos.file_size());
+ }
+ {
+ // One direct and one indirect dependency --> three protos.
+ ProtoContainerTest3PB pb;
+ FileDescriptorSet protos;
+ WritablePBContainerFile::PopulateDescriptorSet(
+ pb.GetDescriptor()->file(), &protos);
+ ASSERT_EQ(3, protos.file_size());
+ }
+}
+
+void TestPBUtil::DumpPBCToString(const string& path,
+ ReadablePBContainerFile::Format format,
+ string* ret) {
+ unique_ptr<RandomAccessFile> reader;
+ ASSERT_OK(env_->NewRandomAccessFile(path, &reader));
+ ReadablePBContainerFile pb_reader(std::move(reader));
+ ASSERT_OK(pb_reader.Open());
+ ostringstream oss;
+ ASSERT_OK(pb_reader.Dump(&oss, format));
+ ASSERT_OK(pb_reader.Close());
+ *ret = oss.str();
+}
+
+TEST_P(TestPBContainerVersions, TestDumpPBContainer) {
+ const char* kExpectedOutput =
+ "Message 0\n"
+ "-------\n"
+ "record_one {\n"
+ " name: \"foo\"\n"
+ " value: 0\n"
+ "}\n"
+ "record_two {\n"
+ " record {\n"
+ " name: \"foo\"\n"
+ " value: 0\n"
+ " }\n"
+ "}\n"
+ "\n"
+ "Message 1\n"
+ "-------\n"
+ "record_one {\n"
+ " name: \"foo\"\n"
+ " value: 1\n"
+ "}\n"
+ "record_two {\n"
+ " record {\n"
+ " name: \"foo\"\n"
+ " value: 2\n"
+ " }\n"
+ "}\n\n";
+
+ const char* kExpectedOutputShort =
+ "0\trecord_one { name: \"foo\" value: 0 } record_two { record { name: \"foo\" value: 0 } }\n"
+ "1\trecord_one { name: \"foo\" value: 1 } record_two { record { name: \"foo\" value: 2 } }\n";
+
+ const char* kExpectedOutputJson =
+ "{\"recordOne\":{\"name\":\"foo\",\"value\":0},\"recordTwo\":{\"record\":{\"name\":\"foo\",\"value\":0}}}\n" // NOLINT
+ "{\"recordOne\":{\"name\":\"foo\",\"value\":1},\"recordTwo\":{\"record\":{\"name\":\"foo\",\"value\":2}}}\n"; // NOLINT
+
+ ProtoContainerTest3PB pb;
+ pb.mutable_record_one()->set_name("foo");
+ pb.mutable_record_two()->mutable_record()->set_name("foo");
+
+ unique_ptr<WritablePBContainerFile> pb_writer;
+ ASSERT_OK(NewPBCWriter(version_, RWFileOptions(), &pb_writer));
+ ASSERT_OK(pb_writer->CreateNew(pb));
+
+ for (int i = 0; i < 2; i++) {
+ pb.mutable_record_one()->set_value(i);
+ pb.mutable_record_two()->mutable_record()->set_value(i*2);
+ ASSERT_OK(pb_writer->Append(pb));
+ }
+ ASSERT_OK(pb_writer->Close());
+
+ string output;
+ NO_FATALS(DumpPBCToString(path_, ReadablePBContainerFile::Format::DEFAULT, &output));
+ ASSERT_STREQ(kExpectedOutput, output.c_str());
+
+ NO_FATALS(DumpPBCToString(path_, ReadablePBContainerFile::Format::ONELINE, &output));
+ ASSERT_STREQ(kExpectedOutputShort, output.c_str());
+
+ NO_FATALS(DumpPBCToString(path_, ReadablePBContainerFile::Format::JSON, &output));
+ ASSERT_STREQ(kExpectedOutputJson, output.c_str());
+}
+
+TEST_F(TestPBUtil, TestOverwriteExistingPB) {
+ ASSERT_OK(CreateKnownGoodContainerFile(NO_OVERWRITE));
+ ASSERT_TRUE(CreateKnownGoodContainerFile(NO_OVERWRITE).IsAlreadyPresent());
+ ASSERT_OK(CreateKnownGoodContainerFile(OVERWRITE));
+ ASSERT_OK(CreateKnownGoodContainerFile(OVERWRITE));
+}
+
+TEST_F(TestPBUtil, TestRedaction) {
+ ASSERT_NE("", gflags::SetCommandLineOption("redact", "log"));
+ TestSecurePrintingPB pb;
+
+ pb.set_insecure1("public 1");
+ pb.set_insecure2("public 2");
+ pb.set_secure1("private 1");
+ pb.set_secure2("private 2");
+ pb.add_repeated_secure("private 3");
+ pb.add_repeated_secure("private 4");
+ pb.set_insecure3("public 3");
+
+ for (auto s : {SecureDebugString(pb), SecureShortDebugString(pb)}) {
+ ASSERT_EQ(string::npos, s.find("private"));
+ ASSERT_STR_CONTAINS(s, "<redacted>");
+ ASSERT_STR_CONTAINS(s, "public 1");
+ ASSERT_STR_CONTAINS(s, "public 2");
+ ASSERT_STR_CONTAINS(s, "public 3");
+ }
+
+ // If we disable redaction, we should see the private fields.
+ ASSERT_NE("", gflags::SetCommandLineOption("redact", ""));
+ ASSERT_STR_CONTAINS(SecureDebugString(pb), "private");
+}
+
+} // namespace pb_util
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/pb_util.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/pb_util.cc b/be/src/kudu/util/pb_util.cc
new file mode 100644
index 0000000..edf1222
--- /dev/null
+++ b/be/src/kudu/util/pb_util.cc
@@ -0,0 +1,1088 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// Some portions copyright (C) 2008, Google, inc.
+//
+// Utilities for working with protobufs.
+// Some of this code is cribbed from the protobuf source,
+// but modified to work with kudu's 'faststring' instead of STL strings.
+
+#include "kudu/util/pb_util.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <deque>
+#include <initializer_list>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/descriptor_database.h>
+#include <google/protobuf/dynamic_message.h>
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/message.h>
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/stubs/status.h>
+#include <google/protobuf/text_format.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "kudu/gutil/integral_types.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/escaping.h"
+#include "kudu/gutil/strings/fastmem.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/coding.h"
+#include "kudu/util/coding-inl.h"
+#include "kudu/util/crc.h"
+#include "kudu/util/debug/sanitizer_scopes.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/jsonwriter.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util-internal.h"
+#include "kudu/util/pb_util.pb.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+using google::protobuf::Descriptor;
+using google::protobuf::DescriptorPool;
+using google::protobuf::DynamicMessageFactory;
+using google::protobuf::FieldDescriptor;
+using google::protobuf::FileDescriptor;
+using google::protobuf::FileDescriptorProto;
+using google::protobuf::FileDescriptorSet;
+using google::protobuf::io::ArrayInputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::Message;
+using google::protobuf::MessageLite;
+using google::protobuf::Reflection;
+using google::protobuf::SimpleDescriptorDatabase;
+using google::protobuf::TextFormat;
+using kudu::crc::Crc;
+using kudu::pb_util::internal::SequentialFileFileInputStream;
+using kudu::pb_util::internal::WritableFileOutputStream;
+using std::deque;
+using std::endl;
+using std::initializer_list;
+using std::ostream;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+using strings::Utf8SafeCEscape;
+
+namespace std {
+
+// Allow the use of FileState with DCHECK_EQ.
+std::ostream& operator<< (std::ostream& os, const kudu::pb_util::FileState& state) {
+ os << static_cast<int>(state);
+ return os;
+}
+
+} // namespace std
+
+namespace kudu {
+namespace pb_util {
+
+static const char* const kTmpTemplateSuffix = ".XXXXXX";
+
+// Protobuf container constants.
+static const uint32_t kPBContainerInvalidVersion = 0;
+static const uint32_t kPBContainerDefaultVersion = 2;
+static const int kPBContainerChecksumLen = sizeof(uint32_t);
+static const char kPBContainerMagic[] = "kuducntr";
+static const int kPBContainerMagicLen = 8;
+static const int kPBContainerV1HeaderLen =
+ kPBContainerMagicLen + sizeof(uint32_t); // Magic number + version.
+static const int kPBContainerV2HeaderLen =
+ kPBContainerV1HeaderLen + kPBContainerChecksumLen; // Same as V1 plus a checksum.
+
+const int kPBContainerMinimumValidLength = kPBContainerV1HeaderLen;
+
+static_assert(arraysize(kPBContainerMagic) - 1 == kPBContainerMagicLen,
+ "kPBContainerMagic does not match expected length");
+
+namespace {
+
+// When serializing, we first compute the byte size, then serialize the message.
+// If serialization produces a different number of bytes than expected, we
+// call this function, which crashes. The problem could be due to a bug in the
+// protobuf implementation but is more likely caused by concurrent modification
+// of the message. This function attempts to distinguish between the two and
+// provide a useful error message.
+void ByteSizeConsistencyError(int byte_size_before_serialization,
+ int byte_size_after_serialization,
+ int bytes_produced_by_serialization) {
+ CHECK_EQ(byte_size_before_serialization, byte_size_after_serialization)
+ << "Protocol message was modified concurrently during serialization.";
+ CHECK_EQ(bytes_produced_by_serialization, byte_size_before_serialization)
+ << "Byte size calculation and serialization were inconsistent. This "
+ "may indicate a bug in protocol buffers or it may be caused by "
+ "concurrent modification of the message.";
+ LOG(FATAL) << "This shouldn't be called if all the sizes are equal.";
+}
+
+string InitializationErrorMessage(const char* action,
+ const MessageLite& message) {
+ // Note: We want to avoid depending on strutil in the lite library, otherwise
+ // we'd use:
+ //
+ // return strings::Substitute(
+ // "Can't $0 message of type \"$1\" because it is missing required "
+ // "fields: $2",
+ // action, message.GetTypeName(),
+ // message.InitializationErrorString());
+
+ string result;
+ result += "Can't ";
+ result += action;
+ result += " message of type \"";
+ result += message.GetTypeName();
+ result += "\" because it is missing required fields: ";
+ result += message.InitializationErrorString();
+ return result;
+}
+
+// Returns true iff the specified protobuf container file version is supported
+// by this implementation.
+bool IsSupportedContainerVersion(uint32_t version) {
+ if (version == 1 || version == 2) {
+ return true;
+ }
+ return false;
+}
+
+// Reads exactly 'length' bytes from the container file into 'result',
+// validating that there is sufficient data in the file to read this length
+// before attempting to do so, and validating that it has read that length
+// after performing the read.
+//
+// If the file size is less than the requested size of the read, returns
+// Status::Incomplete.
+// If there is an unexpected short read, returns Status::Corruption.
+//
+// NOTE: the data in 'result' may be modified even in the case of a failed read.
+template<typename ReadableFileType>
+Status ValidateAndReadData(ReadableFileType* reader, uint64_t file_size,
+ uint64_t* offset, uint64_t length,
+ faststring* result) {
+ // Validate the read length using the file size.
+ if (*offset + length > file_size) {
+ return Status::Incomplete("File size not large enough to be valid",
+ Substitute("Proto container file $0: "
+ "Tried to read $1 bytes at offset "
+ "$2 but file size is only $3 bytes",
+ reader->filename(), length,
+ *offset, file_size));
+ }
+
+ // Perform the read.
+ result->resize(length);
+ RETURN_NOT_OK(reader->Read(*offset, Slice(*result)));
+ *offset += length;
+ return Status::OK();
+}
+
+// Helper macro for use with ParseAndCompareChecksum(). Example usage:
+// RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { data }),
+// CHECKSUM_ERR_MSG("Data checksum does not match", filename, offset));
+#define CHECKSUM_ERR_MSG(prefix, filename, cksum_offset) \
+ Substitute("$0: Incorrect checksum in file $1 at offset $2", prefix, filename, cksum_offset)
+
+// Parses a checksum from the specified buffer and compares it to the bytes
+// given in 'slices' by calculating a rolling CRC32 checksum of the bytes in
+// the 'slices'.
+// If they match, returns OK. Otherwise, returns Status::Corruption.
+Status ParseAndCompareChecksum(const uint8_t* checksum_buf,
+ const initializer_list<Slice>& slices) {
+ uint32_t written_checksum = DecodeFixed32(checksum_buf);
+ uint64_t actual_checksum = 0;
+ Crc* crc32c = crc::GetCrc32cInstance();
+ for (Slice s : slices) {
+ crc32c->Compute(s.data(), s.size(), &actual_checksum);
+ }
+ if (PREDICT_FALSE(actual_checksum != written_checksum)) {
+ return Status::Corruption(Substitute("Checksum does not match. Expected: $0. Actual: $1",
+ written_checksum, actual_checksum));
+ }
+ return Status::OK();
+}
+
+// If necessary, get the size of the file opened by 'reader' in 'cached_file_size'.
+// If 'cached_file_size' already has a value, this is a no-op.
+template<typename ReadableFileType>
+Status CacheFileSize(ReadableFileType* reader,
+ boost::optional<uint64_t>* cached_file_size) {
+ if (*cached_file_size) {
+ return Status::OK();
+ }
+
+ uint64_t file_size;
+ RETURN_NOT_OK(reader->Size(&file_size));
+ *cached_file_size = file_size;
+ return Status::OK();
+}
+
+template<typename ReadableFileType>
+Status RestOfFileIsAllZeros(ReadableFileType* reader,
+ uint64_t filesize,
+ uint64_t offset,
+ bool* all_zeros) {
+ DCHECK(reader);
+ DCHECK_GE(filesize, offset);
+ DCHECK(all_zeros);
+ constexpr uint64_t max_to_read = 4 * 1024 * 1024; // 4 MiB.
+ faststring buf;
+ while (true) {
+ uint64_t to_read = std::min(max_to_read, filesize - offset);
+ if (to_read == 0) {
+ break;
+ }
+ buf.resize(to_read);
+ RETURN_NOT_OK(reader->Read(offset, Slice(buf)));
+ offset += to_read;
+ if (!IsAllZeros(buf)) {
+ *all_zeros = false;
+ return Status::OK();
+ }
+ }
+ *all_zeros = true;
+ return Status::OK();
+}
+
+// Read and parse a message of the specified format at the given offset in the
+// format documented in pb_util.h. 'offset' is an in-out parameter and will be
+// updated with the new offset on success. On failure, 'offset' is not modified.
+template<typename ReadableFileType>
+Status ReadPBStartingAt(ReadableFileType* reader, int version,
+ boost::optional<uint64_t>* cached_file_size,
+ uint64_t* offset, Message* msg) {
+ uint64_t tmp_offset = *offset;
+ VLOG(1) << "Reading PB with version " << version << " starting at offset " << *offset;
+
+ RETURN_NOT_OK(CacheFileSize(reader, cached_file_size));
+ uint64_t file_size = cached_file_size->get();
+
+ if (tmp_offset == *cached_file_size) {
+ return Status::EndOfFile("Reached end of file");
+ }
+
+ // Read the data length from the file.
+ // Version 2+ includes a checksum for the length field.
+ uint64_t length_buflen = (version == 1) ? sizeof(uint32_t)
+ : sizeof(uint32_t) + kPBContainerChecksumLen;
+ faststring length_and_cksum_buf;
+ RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, length_buflen,
+ &length_and_cksum_buf),
+ Substitute("Could not read data length from proto container file $0 "
+ "at offset $1", reader->filename(), *offset));
+ Slice length(length_and_cksum_buf.data(), sizeof(uint32_t));
+
+ // Versions >= 2 have an individual checksum for the data length.
+ if (version >= 2) {
+ // KUDU-2260: If the length and checksum data are all 0's, and the rest of
+ // the file is all 0's, then it's an incomplete record, not corruption.
+ // This can happen e.g. on ext4 in the default data=ordered mode, when the
+ // filesize metadata is updated but the new data is not persisted.
+ // See https://plus.google.com/+KentonVarda/posts/JDwHfAiLGNQ.
+ if (IsAllZeros(length_and_cksum_buf)) {
+ bool all_zeros;
+ RETURN_NOT_OK(RestOfFileIsAllZeros(reader, file_size, tmp_offset, &all_zeros));
+ if (all_zeros) {
+ return Status::Incomplete("incomplete write of PB: rest of file is NULL bytes");
+ }
+ }
+ Slice length_checksum(length_and_cksum_buf.data() + sizeof(uint32_t), kPBContainerChecksumLen);
+ RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(length_checksum.data(), { length }),
+ CHECKSUM_ERR_MSG("Data length checksum does not match",
+ reader->filename(), tmp_offset - kPBContainerChecksumLen));
+ }
+ uint32_t data_length = DecodeFixed32(length.data());
+
+ // Read body and checksum into buffer for checksum & parsing.
+ uint64_t data_and_cksum_buflen = data_length + kPBContainerChecksumLen;
+ faststring body_and_cksum_buf;
+ RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, data_and_cksum_buflen,
+ &body_and_cksum_buf),
+ Substitute("Could not read PB message data from proto container file $0 "
+ "at offset $1",
+ reader->filename(), tmp_offset));
+ Slice body(body_and_cksum_buf.data(), data_length);
+ Slice record_checksum(body_and_cksum_buf.data() + data_length, kPBContainerChecksumLen);
+
+ // Version 1 has a single checksum for length, body.
+ // Version 2+ has individual checksums for length and body, respectively.
+ if (version == 1) {
+ RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { length, body }),
+ CHECKSUM_ERR_MSG("Length and data checksum does not match",
+ reader->filename(), tmp_offset - kPBContainerChecksumLen));
+ } else {
+ RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(record_checksum.data(), { body }),
+ CHECKSUM_ERR_MSG("Data checksum does not match",
+ reader->filename(), tmp_offset - kPBContainerChecksumLen));
+ }
+
+ // The checksum is correct. Time to decode the body.
+ //
+ // We could compare pb_type_ against msg.GetTypeName(), but:
+ // 1. pb_type_ is not available when reading the supplemental header,
+ // 2. ParseFromArray() should fail if the data cannot be parsed into the
+ // provided message type.
+
+ // To permit parsing of very large PB messages, we must use parse through a
+ // CodedInputStream and bump the byte limit. The SetTotalBytesLimit() docs
+ // say that 512MB is the shortest theoretical message length that may produce
+ // integer overflow warnings, so that's what we'll use.
+ ArrayInputStream ais(body.data(), body.size());
+ CodedInputStream cis(&ais);
+ cis.SetTotalBytesLimit(512 * 1024 * 1024, -1);
+ if (PREDICT_FALSE(!msg->ParseFromCodedStream(&cis))) {
+ return Status::IOError("Unable to parse PB from path", reader->filename());
+ }
+
+ *offset = tmp_offset;
+ return Status::OK();
+}
+
+// Wrapper around ReadPBStartingAt() to enforce that we don't return
+// Status::Incomplete() for V1 format files.
+template<typename ReadableFileType>
+Status ReadFullPB(ReadableFileType* reader, int version,
+ boost::optional<uint64_t>* cached_file_size,
+ uint64_t* offset, Message* msg) {
+ bool had_cached_size = *cached_file_size != boost::none;
+ Status s = ReadPBStartingAt(reader, version, cached_file_size, offset, msg);
+ if (PREDICT_FALSE(s.IsIncomplete() && version == 1)) {
+ return Status::Corruption("Unrecoverable incomplete record", s.ToString());
+ }
+ // If we hit EOF, but we were using a cached view of the file size, then it might be
+ // that the file has been extended. Invalidate the cache and try again.
+ if (had_cached_size && (s.IsIncomplete() || s.IsEndOfFile())) {
+ *cached_file_size = boost::none;
+ return ReadFullPB(reader, version, cached_file_size, offset, msg);
+ }
+ return s;
+}
+
+// Read and parse the protobuf container file-level header documented in pb_util.h.
+template<typename ReadableFileType>
+Status ParsePBFileHeader(ReadableFileType* reader, boost::optional<uint64_t>* cached_file_size,
+ uint64_t* offset, int* version) {
+ RETURN_NOT_OK(CacheFileSize(reader, cached_file_size));
+ uint64_t file_size = cached_file_size->get();
+
+ // We initially read enough data for a V2+ file header. This optimizes for
+ // V2+ and is valid on a V1 file because we don't consider these files valid
+ // unless they contain a record in addition to the file header. The
+ // additional 4 bytes required by a V2+ header (vs V1) is still less than the
+ // minimum number of bytes required for a V1 format data record.
+ uint64_t tmp_offset = *offset;
+ faststring header;
+ RETURN_NOT_OK_PREPEND(ValidateAndReadData(reader, file_size, &tmp_offset, kPBContainerV2HeaderLen,
+ &header),
+ Substitute("Could not read header for proto container file $0",
+ reader->filename()));
+ Slice magic_and_version(header.data(), kPBContainerMagicLen + sizeof(uint32_t));
+ Slice checksum(header.data() + kPBContainerMagicLen + sizeof(uint32_t), kPBContainerChecksumLen);
+
+ // Validate magic number.
+ if (PREDICT_FALSE(!strings::memeq(kPBContainerMagic, header.data(), kPBContainerMagicLen))) {
+ string file_magic(reinterpret_cast<const char*>(header.data()), kPBContainerMagicLen);
+ return Status::Corruption("Invalid magic number",
+ Substitute("Expected: $0, found: $1",
+ Utf8SafeCEscape(kPBContainerMagic),
+ Utf8SafeCEscape(file_magic)));
+ }
+
+ // Validate container file version.
+ uint32_t tmp_version = DecodeFixed32(header.data() + kPBContainerMagicLen);
+ if (PREDICT_FALSE(!IsSupportedContainerVersion(tmp_version))) {
+ return Status::NotSupported(
+ Substitute("Protobuf container has unsupported version: $0. Default version: $1",
+ tmp_version, kPBContainerDefaultVersion));
+ }
+
+ // Versions >= 2 have a checksum after the magic number and encoded version
+ // to ensure the integrity of these fields.
+ if (tmp_version >= 2) {
+ RETURN_NOT_OK_PREPEND(ParseAndCompareChecksum(checksum.data(), { magic_and_version }),
+ CHECKSUM_ERR_MSG("File header checksum does not match",
+ reader->filename(), tmp_offset - kPBContainerChecksumLen));
+ } else {
+ // Version 1 doesn't have a header checksum. Rewind our read offset so this
+ // data will be read again when we next attempt to read a data record.
+ tmp_offset -= kPBContainerChecksumLen;
+ }
+
+ *offset = tmp_offset;
+ *version = tmp_version;
+ return Status::OK();
+}
+
+// Read and parse the supplemental header from the container file.
+template<typename ReadableFileType>
+Status ReadSupplementalHeader(ReadableFileType* reader, int version,
+ boost::optional<uint64_t>* cached_file_size,
+ uint64_t* offset,
+ ContainerSupHeaderPB* sup_header) {
+ RETURN_NOT_OK_PREPEND(ReadFullPB(reader, version, cached_file_size, offset, sup_header),
+ Substitute("Could not read supplemental header from proto container file $0 "
+ "with version $1 at offset $2",
+ reader->filename(), version, *offset));
+ return Status::OK();
+}
+
+} // anonymous namespace
+
+void AppendToString(const MessageLite &msg, faststring *output) {
+ DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
+ AppendPartialToString(msg, output);
+}
+
+void AppendPartialToString(const MessageLite &msg, faststring* output) {
+ size_t old_size = output->size();
+ int byte_size = msg.ByteSize();
+ // Messages >2G cannot be serialized due to overflow computing ByteSize.
+ DCHECK_GE(byte_size, 0) << "Error computing ByteSize";
+
+ output->resize(old_size + static_cast<size_t>(byte_size));
+
+ uint8* start = &((*output)[old_size]);
+ uint8* end = msg.SerializeWithCachedSizesToArray(start);
+ if (end - start != byte_size) {
+ ByteSizeConsistencyError(byte_size, msg.ByteSize(), end - start);
+ }
+}
+
+void SerializeToString(const MessageLite &msg, faststring *output) {
+ output->clear();
+ AppendToString(msg, output);
+}
+
+Status ParseFromSequentialFile(MessageLite *msg, SequentialFile *rfile) {
+ SequentialFileFileInputStream input(rfile);
+ if (!msg->ParseFromZeroCopyStream(&input)) {
+ RETURN_NOT_OK(input.status());
+
+ // If it's not a file IO error then it's a parsing error.
+ // Probably, we read wrong or damaged data here.
+ return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
+ }
+ return Status::OK();
+}
+
+Status ParseFromArray(MessageLite* msg, const uint8_t* data, uint32_t length) {
+ if (!msg->ParseFromArray(data, length)) {
+ return Status::Corruption("Error parsing msg", InitializationErrorMessage("parse", *msg));
+ }
+ return Status::OK();
+}
+
+Status WritePBToPath(Env* env, const std::string& path,
+ const MessageLite& msg,
+ SyncMode sync) {
+ const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix;
+ string tmp_path;
+
+ unique_ptr<WritableFile> file;
+ RETURN_NOT_OK(env->NewTempWritableFile(WritableFileOptions(), tmp_template, &tmp_path, &file));
+ auto tmp_deleter = MakeScopedCleanup([&]() {
+ WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + tmp_path);
+ });
+
+ WritableFileOutputStream output(file.get());
+ bool res = msg.SerializeToZeroCopyStream(&output);
+ if (!res || !output.Flush()) {
+ return Status::IOError("Unable to serialize PB to file");
+ }
+
+ if (sync == pb_util::SYNC) {
+ RETURN_NOT_OK_PREPEND(file->Sync(), "Failed to Sync() " + tmp_path);
+ }
+ RETURN_NOT_OK_PREPEND(file->Close(), "Failed to Close() " + tmp_path);
+ RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path), "Failed to rename tmp file to " + path);
+ tmp_deleter.cancel();
+ if (sync == pb_util::SYNC) {
+ RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)), "Failed to SyncDir() parent of " + path);
+ }
+ return Status::OK();
+}
+
+Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg) {
+ shared_ptr<SequentialFile> rfile;
+ RETURN_NOT_OK(env_util::OpenFileForSequential(env, path, &rfile));
+ RETURN_NOT_OK(ParseFromSequentialFile(msg, rfile.get()));
+ return Status::OK();
+}
+
+static void TruncateString(string* s, int max_len) {
+ if (s->size() > max_len) {
+ s->resize(max_len);
+ s->append("<truncated>");
+ }
+}
+
+void TruncateFields(Message* message, int max_len) {
+ const Reflection* reflection = message->GetReflection();
+ vector<const FieldDescriptor*> fields;
+ reflection->ListFields(*message, &fields);
+ for (const FieldDescriptor* field : fields) {
+ if (field->is_repeated()) {
+ for (int i = 0; i < reflection->FieldSize(*message, field); i++) {
+ switch (field->cpp_type()) {
+ case FieldDescriptor::CPPTYPE_STRING: {
+ const string& s_const = reflection->GetRepeatedStringReference(*message, field, i,
+ nullptr);
+ TruncateString(const_cast<string*>(&s_const), max_len);
+ break;
+ }
+ case FieldDescriptor::CPPTYPE_MESSAGE: {
+ TruncateFields(reflection->MutableRepeatedMessage(message, field, i), max_len);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ } else {
+ switch (field->cpp_type()) {
+ case FieldDescriptor::CPPTYPE_STRING: {
+ const string& s_const = reflection->GetStringReference(*message, field, nullptr);
+ TruncateString(const_cast<string*>(&s_const), max_len);
+ break;
+ }
+ case FieldDescriptor::CPPTYPE_MESSAGE: {
+ TruncateFields(reflection->MutableMessage(message, field), max_len);
+ break;
+ }
+ default:
+ break;
+ }
+ }
+ }
+}
+
+namespace {
+class SecureFieldPrinter : public TextFormat::FieldValuePrinter {
+ public:
+ using super = TextFormat::FieldValuePrinter;
+
+ string PrintFieldName(const Message& message,
+ const Reflection* reflection,
+ const FieldDescriptor* field) const override {
+ hide_next_string_ = field->cpp_type() == FieldDescriptor::CPPTYPE_STRING &&
+ field->options().GetExtension(REDACT);
+ return super::PrintFieldName(message, reflection, field);
+ }
+
+ string PrintString(const string& val) const override {
+ if (hide_next_string_) {
+ hide_next_string_ = false;
+ return KUDU_REDACT(super::PrintString(val));
+ }
+ return super::PrintString(val);
+ }
+ string PrintBytes(const string& val) const override {
+ if (hide_next_string_) {
+ hide_next_string_ = false;
+ return KUDU_REDACT(super::PrintBytes(val));
+ }
+ return super::PrintBytes(val);
+ }
+
+ mutable bool hide_next_string_ = false;
+};
+} // anonymous namespace
+
+string SecureDebugString(const Message& msg) {
+ string debug_string;
+ TextFormat::Printer printer;
+ printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter());
+ printer.PrintToString(msg, &debug_string);
+ return debug_string;
+}
+
+string SecureShortDebugString(const Message& msg) {
+ string debug_string;
+
+ TextFormat::Printer printer;
+ printer.SetSingleLineMode(true);
+ printer.SetDefaultFieldValuePrinter(new SecureFieldPrinter());
+
+ printer.PrintToString(msg, &debug_string);
+ // Single line mode currently might have an extra space at the end.
+ if (!debug_string.empty() &&
+ debug_string[debug_string.size() - 1] == ' ') {
+ debug_string.resize(debug_string.size() - 1);
+ }
+
+ return debug_string;
+}
+
+
+WritablePBContainerFile::WritablePBContainerFile(shared_ptr<RWFile> writer)
+ : state_(FileState::NOT_INITIALIZED),
+ offset_(0),
+ version_(kPBContainerDefaultVersion),
+ writer_(std::move(writer)) {
+}
+
+WritablePBContainerFile::~WritablePBContainerFile() {
+ WARN_NOT_OK(Close(), "Could not Close() when destroying file");
+}
+
+Status WritablePBContainerFile::SetVersionForTests(int version) {
+ DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+ if (!IsSupportedContainerVersion(version)) {
+ return Status::NotSupported(Substitute("Version $0 is not supported", version));
+ }
+ version_ = version;
+ return Status::OK();
+}
+
+Status WritablePBContainerFile::CreateNew(const Message& msg) {
+ DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+
+ const uint64_t kHeaderLen = (version_ == 1) ? kPBContainerV1HeaderLen
+ : kPBContainerV1HeaderLen + kPBContainerChecksumLen;
+
+ faststring buf;
+ buf.resize(kHeaderLen);
+
+ // Serialize the magic.
+ strings::memcpy_inlined(buf.data(), kPBContainerMagic, kPBContainerMagicLen);
+ uint64_t offset = kPBContainerMagicLen;
+
+ // Serialize the version.
+ InlineEncodeFixed32(buf.data() + offset, version_);
+ offset += sizeof(uint32_t);
+ DCHECK_EQ(kPBContainerV1HeaderLen, offset)
+ << "Serialized unexpected number of total bytes";
+
+ // Versions >= 2: Checksum the magic and version.
+ if (version_ >= 2) {
+ uint32_t header_checksum = crc::Crc32c(buf.data(), offset);
+ InlineEncodeFixed32(buf.data() + offset, header_checksum);
+ offset += sizeof(uint32_t);
+ }
+ DCHECK_EQ(offset, kHeaderLen);
+
+ // Serialize the supplemental header.
+ ContainerSupHeaderPB sup_header;
+ PopulateDescriptorSet(msg.GetDescriptor()->file(),
+ sup_header.mutable_protos());
+ sup_header.set_pb_type(msg.GetTypeName());
+ RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(sup_header, &buf),
+ "Failed to prepare supplemental header for writing");
+
+ // Write the serialized buffer to the file.
+ RETURN_NOT_OK_PREPEND(AppendBytes(buf),
+ "Failed to append header to file");
+ state_ = FileState::OPEN;
+ return Status::OK();
+}
+
+Status WritablePBContainerFile::OpenExisting() {
+ DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+ boost::optional<uint64_t> size;
+ RETURN_NOT_OK(ParsePBFileHeader(writer_.get(), &size, &offset_, &version_));
+ ContainerSupHeaderPB sup_header;
+ RETURN_NOT_OK(ReadSupplementalHeader(writer_.get(), version_, &size,
+ &offset_, &sup_header));
+ offset_ = size.get(); // Reset the write offset to the end of the file.
+ state_ = FileState::OPEN;
+ return Status::OK();
+}
+
+Status WritablePBContainerFile::AppendBytes(const Slice& data) {
+ std::lock_guard<Mutex> l(offset_lock_);
+ RETURN_NOT_OK(writer_->Write(offset_, data));
+ offset_ += data.size();
+ return Status::OK();
+}
+
+Status WritablePBContainerFile::Append(const Message& msg) {
+ DCHECK_EQ(FileState::OPEN, state_);
+
+ faststring buf;
+ RETURN_NOT_OK_PREPEND(AppendMsgToBuffer(msg, &buf),
+ "Failed to prepare buffer for writing");
+ RETURN_NOT_OK_PREPEND(AppendBytes(buf), "Failed to append data to file");
+
+ return Status::OK();
+}
+
+Status WritablePBContainerFile::Flush() {
+ DCHECK_EQ(FileState::OPEN, state_);
+
+ // TODO: Flush just the dirty bytes.
+ RETURN_NOT_OK_PREPEND(writer_->Flush(RWFile::FLUSH_ASYNC, 0, 0), "Failed to Flush() file");
+
+ return Status::OK();
+}
+
+Status WritablePBContainerFile::Sync() {
+ DCHECK_EQ(FileState::OPEN, state_);
+
+ RETURN_NOT_OK_PREPEND(writer_->Sync(), "Failed to Sync() file");
+
+ return Status::OK();
+}
+
+Status WritablePBContainerFile::Close() {
+ if (state_ != FileState::CLOSED) {
+ state_ = FileState::CLOSED;
+ Status s = writer_->Close();
+ writer_.reset();
+ RETURN_NOT_OK_PREPEND(s, "Failed to Close() file");
+ }
+ return Status::OK();
+}
+
+const string& WritablePBContainerFile::filename() const {
+ return writer_->filename();
+}
+
+Status WritablePBContainerFile::AppendMsgToBuffer(const Message& msg, faststring* buf) {
+ DCHECK(msg.IsInitialized()) << InitializationErrorMessage("serialize", msg);
+ int data_len = msg.ByteSize();
+ // Messages >2G cannot be serialized due to overflow computing ByteSize.
+ DCHECK_GE(data_len, 0) << "Error computing ByteSize";
+ uint64_t record_buflen = sizeof(uint32_t) + data_len + sizeof(uint32_t);
+ if (version_ >= 2) {
+ record_buflen += sizeof(uint32_t); // Additional checksum just for the length.
+ }
+
+ // Grow the buffer to hold the new data.
+ uint64_t record_offset = buf->size();
+ buf->resize(record_offset + record_buflen);
+ uint8_t* dst = buf->data() + record_offset;
+
+ // Serialize the data length.
+ size_t cur_offset = 0;
+ InlineEncodeFixed32(dst + cur_offset, static_cast<uint32_t>(data_len));
+ cur_offset += sizeof(uint32_t);
+
+ // For version >= 2: Serialize the checksum of the data length.
+ if (version_ >= 2) {
+ uint32_t length_checksum = crc::Crc32c(&data_len, sizeof(data_len));
+ InlineEncodeFixed32(dst + cur_offset, length_checksum);
+ cur_offset += sizeof(uint32_t);
+ }
+
+ // Serialize the data.
+ uint64_t data_offset = cur_offset;
+ if (PREDICT_FALSE(!msg.SerializeWithCachedSizesToArray(dst + cur_offset))) {
+ return Status::IOError("Failed to serialize PB to array");
+ }
+ cur_offset += data_len;
+
+ // Calculate and serialize the data checksum.
+ // For version 1, this is the checksum of the len + data.
+ // For version >= 2, this is only the checksum of the data.
+ uint32_t data_checksum;
+ if (version_ == 1) {
+ data_checksum = crc::Crc32c(dst, cur_offset);
+ } else {
+ data_checksum = crc::Crc32c(dst + data_offset, data_len);
+ }
+ InlineEncodeFixed32(dst + cur_offset, data_checksum);
+ cur_offset += sizeof(uint32_t);
+
+ DCHECK_EQ(record_buflen, cur_offset) << "Serialized unexpected number of total bytes";
+ return Status::OK();
+}
+
+void WritablePBContainerFile::PopulateDescriptorSet(
+ const FileDescriptor* desc, FileDescriptorSet* output) {
+ // Because we don't compile protobuf with TSAN enabled, copying the
+ // static PB descriptors in this function ends up triggering a lot of
+ // race reports. We suppress the reports, but TSAN still has to walk
+ // the stack, etc, and this function becomes very slow. So, we ignore
+ // TSAN here.
+ debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+
+ FileDescriptorSet all_descs;
+
+ // Tracks all schemas that have been added to 'unemitted' at one point
+ // or another. Is a superset of 'unemitted' and only ever grows.
+ unordered_set<const FileDescriptor*> processed;
+
+ // Tracks all remaining unemitted schemas.
+ deque<const FileDescriptor*> unemitted;
+
+ InsertOrDie(&processed, desc);
+ unemitted.push_front(desc);
+ while (!unemitted.empty()) {
+ const FileDescriptor* proto = unemitted.front();
+
+ // The current schema is emitted iff we've processed (i.e. emitted) all
+ // of its dependencies.
+ bool emit = true;
+ for (int i = 0; i < proto->dependency_count(); i++) {
+ const FileDescriptor* dep = proto->dependency(i);
+ if (InsertIfNotPresent(&processed, dep)) {
+ unemitted.push_front(dep);
+ emit = false;
+ }
+ }
+ if (emit) {
+ unemitted.pop_front();
+ proto->CopyTo(all_descs.mutable_file()->Add());
+ }
+ }
+ all_descs.Swap(output);
+}
+
+ReadablePBContainerFile::ReadablePBContainerFile(shared_ptr<RandomAccessFile> reader)
+ : state_(FileState::NOT_INITIALIZED),
+ version_(kPBContainerInvalidVersion),
+ offset_(0),
+ reader_(std::move(reader)) {
+}
+
+ReadablePBContainerFile::~ReadablePBContainerFile() {
+ Close();
+}
+
+Status ReadablePBContainerFile::Open() {
+ DCHECK_EQ(FileState::NOT_INITIALIZED, state_);
+ RETURN_NOT_OK(ParsePBFileHeader(reader_.get(), &cached_file_size_, &offset_, &version_));
+ ContainerSupHeaderPB sup_header;
+ RETURN_NOT_OK(ReadSupplementalHeader(reader_.get(), version_, &cached_file_size_,
+ &offset_, &sup_header));
+ protos_.reset(sup_header.release_protos());
+ pb_type_ = sup_header.pb_type();
+ state_ = FileState::OPEN;
+ return Status::OK();
+}
+
+Status ReadablePBContainerFile::ReadNextPB(Message* msg) {
+ DCHECK_EQ(FileState::OPEN, state_);
+ return ReadFullPB(reader_.get(), version_, &cached_file_size_, &offset_, msg);
+}
+
+Status ReadablePBContainerFile::GetPrototype(const Message** prototype) {
+ if (!prototype_) {
+ // Loading the schemas into a DescriptorDatabase (and not directly into
+ // a DescriptorPool) defers resolution until FindMessageTypeByName()
+ // below, allowing for schemas to be loaded in any order.
+ unique_ptr<SimpleDescriptorDatabase> db(new SimpleDescriptorDatabase());
+ for (int i = 0; i < protos()->file_size(); i++) {
+ if (!db->Add(protos()->file(i))) {
+ return Status::Corruption("Descriptor not loaded", Substitute(
+ "Could not load descriptor for PB type $0 referenced in container file",
+ pb_type()));
+ }
+ }
+ unique_ptr<DescriptorPool> pool(new DescriptorPool(db.get()));
+ const Descriptor* desc = pool->FindMessageTypeByName(pb_type());
+ if (!desc) {
+ return Status::NotFound("Descriptor not found", Substitute(
+ "Could not find descriptor for PB type $0 referenced in container file",
+ pb_type()));
+ }
+
+ unique_ptr<DynamicMessageFactory> factory(new DynamicMessageFactory());
+ const Message* p = factory->GetPrototype(desc);
+ if (!p) {
+ return Status::NotSupported("Descriptor not supported", Substitute(
+ "Descriptor $0 referenced in container file not supported",
+ pb_type()));
+ }
+
+ db_ = std::move(db);
+ descriptor_pool_ = std::move(pool);
+ message_factory_ = std::move(factory);
+ prototype_ = p;
+ }
+ *prototype = prototype_;
+ return Status::OK();
+}
+
+Status ReadablePBContainerFile::Dump(ostream* os, ReadablePBContainerFile::Format format) {
+ DCHECK_EQ(FileState::OPEN, state_);
+
+ // Since we use the protobuf library support for dumping JSON, there isn't any easy
+ // way to hook in our redaction support. Since this is only used by CLI tools,
+ // just refuse to dump JSON if redaction is enabled.
+ if (format == Format::JSON && KUDU_SHOULD_REDACT()) {
+ return Status::NotSupported("cannot dump PBC file in JSON format if redaction is enabled");
+ }
+
+ const char* const kDashes = "-------";
+
+ if (format == Format::DEBUG) {
+ *os << "File header" << endl;
+ *os << kDashes << endl;
+ *os << "Protobuf container version: " << version_ << endl;
+ *os << "Total container file size: " << *cached_file_size_ << endl;
+ *os << "Entry PB type: " << pb_type_ << endl;
+ *os << endl;
+ }
+
+ // Use the embedded protobuf information from the container file to
+ // create the appropriate kind of protobuf Message.
+ const Message* prototype;
+ RETURN_NOT_OK(GetPrototype(&prototype));
+ unique_ptr<Message> msg(prototype_->New());
+
+ // Dump each message in the container file.
+ int count = 0;
+ uint64_t prev_offset = offset_;
+ Status s;
+ string buf;
+ for (s = ReadNextPB(msg.get());
+ s.ok();
+ s = ReadNextPB(msg.get())) {
+ switch (format) {
+ case Format::ONELINE:
+ *os << count << "\t" << SecureShortDebugString(*msg) << endl;
+ break;
+ case Format::DEFAULT:
+ case Format::DEBUG:
+ *os << "Message " << count << endl;
+ if (format == Format::DEBUG) {
+ *os << "offset: " << prev_offset << endl;
+ *os << "length: " << (offset_ - prev_offset) << endl;
+ }
+ *os << kDashes << endl;
+ *os << SecureDebugString(*msg) << endl;
+ break;
+ case Format::JSON:
+ buf.clear();
+ const auto& google_status = google::protobuf::util::MessageToJsonString(
+ *msg, &buf, google::protobuf::util::JsonPrintOptions());
+ if (!google_status.ok()) {
+ return Status::RuntimeError("could not convert PB to JSON", google_status.ToString());
+ }
+ *os << buf << endl;
+ break;
+ }
+
+ prev_offset = offset_;
+ count++;
+ }
+ if (format == Format::DEBUG && !s.IsEndOfFile()) {
+ *os << "Message " << count << endl;
+ *os << "error: failed to parse protobuf message" << endl;
+ *os << "offset: " << prev_offset << endl;
+ *os << "remaining file length: " << (*cached_file_size_ - prev_offset) << endl;
+ *os << kDashes << endl;
+ }
+ return s.IsEndOfFile() ? Status::OK() : s;
+}
+
+Status ReadablePBContainerFile::Close() {
+ state_ = FileState::CLOSED;
+ reader_.reset();
+ return Status::OK();
+}
+
+int ReadablePBContainerFile::version() const {
+ DCHECK_EQ(FileState::OPEN, state_);
+ return version_;
+}
+
+uint64_t ReadablePBContainerFile::offset() const {
+ DCHECK_EQ(FileState::OPEN, state_);
+ return offset_;
+}
+
+Status ReadPBContainerFromPath(Env* env, const std::string& path, Message* msg) {
+ unique_ptr<RandomAccessFile> file;
+ RETURN_NOT_OK(env->NewRandomAccessFile(path, &file));
+
+ ReadablePBContainerFile pb_file(std::move(file));
+ RETURN_NOT_OK(pb_file.Open());
+ RETURN_NOT_OK(pb_file.ReadNextPB(msg));
+ return pb_file.Close();
+}
+
+Status WritePBContainerToPath(Env* env, const std::string& path,
+ const Message& msg,
+ CreateMode create,
+ SyncMode sync) {
+ TRACE_EVENT2("io", "WritePBContainerToPath",
+ "path", path,
+ "msg_type", msg.GetTypeName());
+
+ if (create == NO_OVERWRITE && env->FileExists(path)) {
+ return Status::AlreadyPresent(Substitute("File $0 already exists", path));
+ }
+
+ const string tmp_template = path + kTmpInfix + kTmpTemplateSuffix;
+ string tmp_path;
+
+ unique_ptr<RWFile> file;
+ RETURN_NOT_OK(env->NewTempRWFile(RWFileOptions(), tmp_template, &tmp_path, &file));
+ auto tmp_deleter = MakeScopedCleanup([&]() {
+ WARN_NOT_OK(env->DeleteFile(tmp_path), "Could not delete file " + tmp_path);
+ });
+
+ WritablePBContainerFile pb_file(std::move(file));
+ RETURN_NOT_OK(pb_file.CreateNew(msg));
+ RETURN_NOT_OK(pb_file.Append(msg));
+ if (sync == pb_util::SYNC) {
+ RETURN_NOT_OK(pb_file.Sync());
+ }
+ RETURN_NOT_OK(pb_file.Close());
+ RETURN_NOT_OK_PREPEND(env->RenameFile(tmp_path, path),
+ "Failed to rename tmp file to " + path);
+ tmp_deleter.cancel();
+ if (sync == pb_util::SYNC) {
+ RETURN_NOT_OK_PREPEND(env->SyncDir(DirName(path)),
+ "Failed to SyncDir() parent of " + path);
+ }
+ return Status::OK();
+}
+
+
+scoped_refptr<debug::ConvertableToTraceFormat> PbTracer::TracePb(const Message& msg) {
+ return make_scoped_refptr(new PbTracer(msg));
+}
+
+PbTracer::PbTracer(const Message& msg) : msg_(msg.New()) {
+ msg_->CopyFrom(msg);
+}
+
+void PbTracer::AppendAsTraceFormat(std::string* out) const {
+ pb_util::TruncateFields(msg_.get(), kMaxFieldLengthToTrace);
+ std::ostringstream ss;
+ JsonWriter jw(&ss, JsonWriter::COMPACT);
+ jw.Protobuf(*msg_);
+ out->append(ss.str());
+}
+
+} // namespace pb_util
+} // namespace kudu