You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/08/04 16:22:53 UTC
incubator-impala git commit: IMPALA-5742: De-allocate buffer in
parquet-reader on exit
Repository: incubator-impala
Updated Branches:
refs/heads/master 79205bb33 -> b55ec3f64
IMPALA-5742: De-allocate buffer in parquet-reader on exit
Testing: ran with ASAN with detect_leaks=1. Leak does not reproduce with
fix.
Change-Id: Iedf163f858b42d2ca63a7a65d6e457539de59ab9
Reviewed-on: http://gerrit.cloudera.org:8080/7572
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b55ec3f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b55ec3f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b55ec3f6
Branch: refs/heads/master
Commit: b55ec3f64f2a16259d4c5cd2e881701fee4c603f
Parents: 79205bb
Author: Henry Robinson <he...@cloudera.com>
Authored: Mon Jul 31 20:30:40 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Aug 4 07:54:00 2017 +0000
----------------------------------------------------------------------
be/src/util/parquet-reader.cc | 38 +++++++++++++++++++-------------------
1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b55ec3f6/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index afbb33d..e21cff6 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+#include <snappy.h>
#include <iostream>
#include <sstream>
#include <vector>
#include <gflags/gflags.h>
-#include <snappy.h>
#include "gen-cpp/parquet_types.h"
// TCompactProtocol requires some #defines to work right.
@@ -28,10 +28,10 @@
#define ARITHMETIC_RIGHT_SHIFT 1
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wstring-plus-int"
-#include <thrift/protocol/TCompactProtocol.h>
+#include <thrift/TApplicationException.h>
#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/protocol/TCompactProtocol.h>
#include <thrift/protocol/TDebugProtocol.h>
-#include <thrift/TApplicationException.h>
#include <thrift/transport/TBufferTransports.h>
#pragma clang diagnostic pop
@@ -68,8 +68,8 @@ boost::shared_ptr<TProtocol> CreateDeserializeProtocol(
// all the bytes needed to store the thrift message. On return, len will be
// set to the actual length of the header.
template <class T>
-bool DeserializeThriftMsg(uint8_t* buf, uint32_t* len, bool compact,
- T* deserialized_msg) {
+bool DeserializeThriftMsg(
+ uint8_t* buf, uint32_t* len, bool compact, T* deserialized_msg) {
// Deserialize msg bytes into c++ thrift msg using memory transport.
boost::shared_ptr<TMemoryBuffer> tmem_transport(new TMemoryBuffer(buf, *len));
boost::shared_ptr<TProtocol> tproto =
@@ -91,8 +91,8 @@ string TypeMapping(Type::type t) {
return "UNKNOWN";
}
-void AppendSchema(const vector<SchemaElement>& schema, int level,
- int* idx, stringstream* ss) {
+void AppendSchema(
+ const vector<SchemaElement>& schema, int level, int* idx, stringstream* ss) {
for (int i = 0; i < level; ++i) {
(*ss) << " ";
}
@@ -122,8 +122,8 @@ string GetSchema(const FileMetaData& md) {
// Inherit from RleDecoder to get access to repeat_count_, which is protected.
class ParquetLevelReader : public impala::RleDecoder {
public:
- ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width) :
- RleDecoder(buffer, buffer_len, bit_width) {}
+ ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width)
+ : RleDecoder(buffer, buffer_len, bit_width) {}
uint32_t repeat_count() const { return repeat_count_; }
};
@@ -143,13 +143,13 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_
decompressed_buffer.resize(header.uncompressed_page_size);
boost::scoped_ptr<impala::Codec> decompressor;
- impala::Codec::CreateDecompressor(NULL, false,
- impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor);
+ impala::Codec::CreateDecompressor(
+ NULL, false, impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor);
uint8_t* buffer_ptr = decompressed_buffer.data();
int uncompressed_page_size = header.uncompressed_page_size;
- impala::Status s = decompressor->ProcessBlock32(true, header.compressed_page_size,
- data, &uncompressed_page_size, &buffer_ptr);
+ impala::Status s = decompressor->ProcessBlock32(
+ true, header.compressed_page_size, data, &uncompressed_page_size, &buffer_ptr);
if (!s.ok()) {
cerr << "Error: Decompression failed: " << s.GetDetail() << " \n";
exit(1);
@@ -206,16 +206,16 @@ int main(int argc, char** argv) {
cerr << "File Length: " << file_len << endl;
- uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(file_len));
+ vector<uint8_t> buffer_vector(file_len);
+ uint8_t* buffer = buffer_vector.data();
size_t bytes_read = fread(buffer, 1, file_len, file);
assert(bytes_read == file_len);
(void)bytes_read;
// Check file starts and ends with magic bytes
- assert(
- memcmp(buffer, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
+ assert(memcmp(buffer, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
assert(memcmp(buffer + file_len - sizeof(PARQUET_VERSION_NUMBER),
- PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
+ PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
// Get metadata
uint8_t* metadata_len_ptr =
@@ -253,7 +253,7 @@ int main(int argc, char** argv) {
int first_page_offset = col.meta_data.data_page_offset;
if (col.meta_data.__isset.dictionary_page_offset) {
first_page_offset = ::min(first_page_offset,
- (int)col.meta_data.dictionary_page_offset);
+ static_cast<int>(col.meta_data.dictionary_page_offset));
}
uint8_t* data = buffer + first_page_offset;
uint8_t* col_end = data + col.meta_data.total_compressed_size;
@@ -288,7 +288,7 @@ int main(int argc, char** argv) {
num_rows += column_num_rows[0];
}
double compression_ratio =
- (double)total_uncompressed_data_size / total_compressed_data_size;
+ static_cast<double>(total_uncompressed_data_size) / total_compressed_data_size;
stringstream ss;
ss << "\nSummary:\n"
<< " Rows: " << num_rows << endl