You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/08/04 16:22:53 UTC

incubator-impala git commit: IMPALA-5742: De-allocate buffer in parquet-reader on exit

Repository: incubator-impala
Updated Branches:
  refs/heads/master 79205bb33 -> b55ec3f64


IMPALA-5742: De-allocate buffer in parquet-reader on exit

Testing: ran with ASAN with detect_leaks=1. Leak does not reproduce with
fix.

Change-Id: Iedf163f858b42d2ca63a7a65d6e457539de59ab9
Reviewed-on: http://gerrit.cloudera.org:8080/7572
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b55ec3f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b55ec3f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b55ec3f6

Branch: refs/heads/master
Commit: b55ec3f64f2a16259d4c5cd2e881701fee4c603f
Parents: 79205bb
Author: Henry Robinson <he...@cloudera.com>
Authored: Mon Jul 31 20:30:40 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Aug 4 07:54:00 2017 +0000

----------------------------------------------------------------------
 be/src/util/parquet-reader.cc | 38 +++++++++++++++++++-------------------
 1 file changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b55ec3f6/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index afbb33d..e21cff6 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -15,11 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <snappy.h>
 #include <iostream>
 #include <sstream>
 #include <vector>
 #include <gflags/gflags.h>
-#include <snappy.h>
 #include "gen-cpp/parquet_types.h"
 
 // TCompactProtocol requires some #defines to work right.
@@ -28,10 +28,10 @@
 #define ARITHMETIC_RIGHT_SHIFT 1
 #pragma clang diagnostic push
 #pragma clang diagnostic ignored "-Wstring-plus-int"
-#include <thrift/protocol/TCompactProtocol.h>
+#include <thrift/TApplicationException.h>
 #include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/protocol/TCompactProtocol.h>
 #include <thrift/protocol/TDebugProtocol.h>
-#include <thrift/TApplicationException.h>
 #include <thrift/transport/TBufferTransports.h>
 #pragma clang diagnostic pop
 
@@ -68,8 +68,8 @@ boost::shared_ptr<TProtocol> CreateDeserializeProtocol(
 // all the bytes needed to store the thrift message.  On return, len will be
 // set to the actual length of the header.
 template <class T>
-bool DeserializeThriftMsg(uint8_t* buf, uint32_t* len, bool compact,
-    T* deserialized_msg) {
+bool DeserializeThriftMsg(
+    uint8_t* buf, uint32_t* len, bool compact, T* deserialized_msg) {
   // Deserialize msg bytes into c++ thrift msg using memory transport.
   boost::shared_ptr<TMemoryBuffer> tmem_transport(new TMemoryBuffer(buf, *len));
   boost::shared_ptr<TProtocol> tproto =
@@ -91,8 +91,8 @@ string TypeMapping(Type::type t) {
   return "UNKNOWN";
 }
 
-void AppendSchema(const vector<SchemaElement>& schema, int level,
-    int* idx, stringstream* ss) {
+void AppendSchema(
+    const vector<SchemaElement>& schema, int level, int* idx, stringstream* ss) {
   for (int i = 0; i < level; ++i) {
     (*ss) << "  ";
   }
@@ -122,8 +122,8 @@ string GetSchema(const FileMetaData& md) {
 // Inherit from RleDecoder to get access to repeat_count_, which is protected.
 class ParquetLevelReader : public impala::RleDecoder {
  public:
-  ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width) :
-    RleDecoder(buffer, buffer_len, bit_width) {}
+  ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width)
+    : RleDecoder(buffer, buffer_len, bit_width) {}
 
   uint32_t repeat_count() const { return repeat_count_; }
 };
@@ -143,13 +143,13 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_
     decompressed_buffer.resize(header.uncompressed_page_size);
 
     boost::scoped_ptr<impala::Codec> decompressor;
-    impala::Codec::CreateDecompressor(NULL, false,
-        impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor);
+    impala::Codec::CreateDecompressor(
+        NULL, false, impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor);
 
     uint8_t* buffer_ptr = decompressed_buffer.data();
     int uncompressed_page_size = header.uncompressed_page_size;
-    impala::Status s = decompressor->ProcessBlock32(true, header.compressed_page_size,
-        data, &uncompressed_page_size, &buffer_ptr);
+    impala::Status s = decompressor->ProcessBlock32(
+        true, header.compressed_page_size, data, &uncompressed_page_size, &buffer_ptr);
     if (!s.ok()) {
       cerr << "Error: Decompression failed: " << s.GetDetail() << " \n";
       exit(1);
@@ -206,16 +206,16 @@ int main(int argc, char** argv) {
 
   cerr << "File Length: " << file_len << endl;
 
-  uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(file_len));
+  vector<uint8_t> buffer_vector(file_len);
+  uint8_t* buffer = buffer_vector.data();
   size_t bytes_read = fread(buffer, 1, file_len, file);
   assert(bytes_read == file_len);
   (void)bytes_read;
 
   // Check file starts and ends with magic bytes
-  assert(
-      memcmp(buffer, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
+  assert(memcmp(buffer, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
   assert(memcmp(buffer + file_len - sizeof(PARQUET_VERSION_NUMBER),
-      PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
+             PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) == 0);
 
   // Get metadata
   uint8_t* metadata_len_ptr =
@@ -253,7 +253,7 @@ int main(int argc, char** argv) {
       int first_page_offset = col.meta_data.data_page_offset;
       if (col.meta_data.__isset.dictionary_page_offset) {
         first_page_offset = ::min(first_page_offset,
-            (int)col.meta_data.dictionary_page_offset);
+            static_cast<int>(col.meta_data.dictionary_page_offset));
       }
       uint8_t* data = buffer + first_page_offset;
       uint8_t* col_end = data + col.meta_data.total_compressed_size;
@@ -288,7 +288,7 @@ int main(int argc, char** argv) {
     num_rows += column_num_rows[0];
   }
   double compression_ratio =
-      (double)total_uncompressed_data_size / total_compressed_data_size;
+      static_cast<double>(total_uncompressed_data_size) / total_compressed_data_size;
   stringstream ss;
   ss << "\nSummary:\n"
      << "  Rows: " << num_rows << endl