You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/01/29 02:38:31 UTC

parquet-cpp git commit: PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 5af54bec2 -> 89f5a5439


PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types

This PR adds support for INT96 and FIXED_LEN_BYTE_ARRAY types.
It modifies the examples and DebugPrint to handle these types.

Author: Deepak Majeti <de...@hp.com>

Closes #27 from majetideepak/master and squashes the following commits:

5ba0a03 [Deepak Majeti] PARQUET-428: Support INT96 and FIXED_LEN_BYTE_ARRAY types


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/89f5a543
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/89f5a543
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/89f5a543

Branch: refs/heads/master
Commit: 89f5a543929b7d2bb3249626fb03cf9130540439
Parents: 5af54be
Author: Deepak Majeti <de...@hp.com>
Authored: Thu Jan 28 17:38:27 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Thu Jan 28 17:38:27 2016 -0800

----------------------------------------------------------------------
 src/parquet/column_reader.cc                |  2 ++
 src/parquet/column_reader.h                 |  1 +
 src/parquet/encodings/dictionary-encoding.h | 19 ++++++++++++
 src/parquet/encodings/plain-encoding.h      | 16 ++++++++++
 src/parquet/reader.cc                       | 38 ++++++++++++++++--------
 src/parquet/types.h                         | 38 ++++++++++++++++++++++--
 src/parquet/util/compiler-util.h            | 21 +++++++++++++
 7 files changed, 120 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index 168bc94..4f565cf 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -187,6 +187,8 @@ std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData*
       return std::make_shared<DoubleReader>(metadata, element, stream);
     case Type::BYTE_ARRAY:
       return std::make_shared<ByteArrayReader>(metadata, element, stream);
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_shared<FixedLenByteArrayReader>(metadata, element, stream);
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index cd6cc02..00722f5 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -159,6 +159,7 @@ typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
 typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
 typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
 typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
+typedef TypedColumnReader<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader;
 
 
 template <int TYPE>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index a4ef342..9641e7e 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -111,6 +111,25 @@ inline void DictionaryDecoder<parquet::Type::BYTE_ARRAY>::Init(
   }
 }
 
+template <>
+inline void DictionaryDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Init(
+    Decoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>* dictionary) {
+  int num_dictionary_values = dictionary->values_left();
+  dictionary_.resize(num_dictionary_values);
+  dictionary->Decode(&dictionary_[0], num_dictionary_values);
+
+  int fixed_len = schema_->type_length;
+  int total_size = num_dictionary_values*fixed_len;
+
+  byte_array_data_.resize(total_size);
+  int offset = 0;
+  for (int i = 0; i < num_dictionary_values; ++i) {
+    memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len);
+    dictionary_[i].ptr = &byte_array_data_[offset];
+    offset += fixed_len;
+  }
+}
+
 } // namespace parquet_cpp
 
 #endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 4ddf878..dc71e39 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -74,6 +74,22 @@ inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
   return max_values;
 }
 
+// Template specialization for FIXED_LEN_BYTE_ARRAY
+template <>
+inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode(FixedLenByteArray* buffer,
+    int max_values) {
+  max_values = std::min(max_values, num_values_);
+  int len = schema_->type_length;
+  for (int i = 0; i < max_values; ++i) {
+    if (len_ < len) ParquetException::EofException();
+    buffer[i].ptr = data_;
+    data_ += len;
+    len_ -= len;
+  }
+  num_values_ -= max_values;
+  return max_values;
+}
+
 template <>
 class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> {
  public:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index 7c727ba..4654f14 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -217,6 +217,9 @@ static string parquet_type_to_string(Type::type t) {
     case Type::INT64:
       return "INT64";
       break;
+    case Type::INT96:
+      return "INT96";
+      break;
     case Type::FLOAT:
       return "FLOAT";
       break;
@@ -226,9 +229,6 @@ static string parquet_type_to_string(Type::type t) {
     case Type::BYTE_ARRAY:
       return "BYTE_ARRAY";
       break;
-    case Type::INT96:
-      return "INT96";
-      break;
     case Type::FIXED_LEN_BYTE_ARRAY:
       return "FIXED_LEN_BYTE_ARRAY";
       break;
@@ -239,7 +239,7 @@ static string parquet_type_to_string(Type::type t) {
 }
 
 // the fixed initial size is just for an example
-#define COL_WIDTH "17"
+#define COL_WIDTH "20"
 
 void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
   if (!parsed_metadata_) {
@@ -251,10 +251,6 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
   for (int c = 1; c < metadata_.schema.size(); ++c) {
     stream << "Column " << c-1 << ": " << metadata_.schema[c].name << " ("
            << parquet_type_to_string(metadata_.schema[c].type);
-    if (metadata_.schema[c].type == Type::INT96 ||
-        metadata_.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) {
-      stream << " - not supported";
-    }
     stream << ")\n";
   }
 
@@ -291,10 +287,6 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
 
       printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str());
 
-      if (col_type == Type::INT96 || col_type == Type::FIXED_LEN_BYTE_ARRAY) {
-        continue;
-      }
-
       // This is OK in this method as long as the RowGroupReader does not get deleted
       readers[c] = col_reader;
     }
@@ -345,6 +337,16 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
               }
               break;
             }
+            case Type::INT96: {
+              Int96 val = reinterpret_cast<Int96Reader*>(readers[c])->NextValue(
+                  &def_level[c], &rep_level[c]);
+              if (def_level[c] >= rep_level[c]) {
+                string result = Int96ToString(val);
+                snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
+                stream << buffer;
+              }
+              break;
+            }
             case Type::FLOAT: {
               float val = reinterpret_cast<FloatReader*>(readers[c])->NextValue(
                   &def_level[c], &rep_level[c]);
@@ -373,7 +375,17 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
               }
               break;
             }
-            default:
+             case Type::FIXED_LEN_BYTE_ARRAY: {
+              FixedLenByteArray val = reinterpret_cast<FixedLenByteArrayReader*>(
+                  readers[c])->NextValue(&def_level[c], &rep_level[c]);
+              if (def_level[c] >= rep_level[c]) {
+                string result = FixedLenByteArrayToString(val, metadata_.schema[c+1].type_length);
+                snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
+                stream << buffer;
+              }
+              break;
+            }
+           default:
               continue;
           }
         }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 37f538a..409e335 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -22,8 +22,10 @@
 #include <cstdint>
 #include <cstring>
 #include <string>
+#include <sstream>
 
 #include "parquet/thrift/parquet_types.h"
+#include "parquet/util/compiler-util.h"
 
 namespace parquet_cpp {
 
@@ -32,11 +34,36 @@ struct ByteArray {
   const uint8_t* ptr;
 };
 
+struct FixedLenByteArray {
+  const uint8_t* ptr;
+};
+
+MANUALLY_ALIGNED_STRUCT(1) Int96 {
+  uint32_t value[3];
+};
+STRUCT_END(Int96, 12);
 
 static inline std::string ByteArrayToString(const ByteArray& a) {
   return std::string(reinterpret_cast<const char*>(a.ptr), a.len);
 }
 
+static inline std::string Int96ToString(const Int96& a) {
+  std::stringstream result;
+  for (int i = 0; i < 3; i++) {
+     result << a.value[i]  << " ";
+  }
+  return result.str();
+}
+
+static inline std::string FixedLenByteArrayToString(const FixedLenByteArray& a, int len) {
+  const uint8_t *bytes = reinterpret_cast<const uint8_t*>(a.ptr);
+  std::stringstream result;
+  for (int i = 0; i < len; i++) {
+     result << (uint32_t)bytes[i]  << " ";
+  }
+  return result.str();
+}
+
 static inline int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
   int len = std::min(x1.len, x2.len);
   int cmp = memcmp(x1.ptr, x2.ptr, len);
@@ -76,8 +103,7 @@ struct type_traits<parquet::Type::INT64> {
 
 template <>
 struct type_traits<parquet::Type::INT96> {
-  // TODO
-  typedef void* value_type;
+  typedef Int96 value_type;
   static constexpr parquet::Type::type parquet_type = parquet::Type::INT96;
 
   static constexpr size_t value_byte_size = 12;
@@ -107,6 +133,14 @@ struct type_traits<parquet::Type::BYTE_ARRAY> {
   static constexpr size_t value_byte_size = sizeof(ByteArray);
 };
 
+template <>
+struct type_traits<parquet::Type::FIXED_LEN_BYTE_ARRAY> {
+  typedef FixedLenByteArray value_type;
+  static constexpr parquet::Type::type parquet_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
+
+  static constexpr size_t value_byte_size = sizeof(FixedLenByteArray);
+};
+
 } // namespace parquet_cpp
 
 #endif // PARQUET_TYPES_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89f5a543/src/parquet/util/compiler-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/compiler-util.h b/src/parquet/util/compiler-util.h
index bd0dbed..9048ba1 100644
--- a/src/parquet/util/compiler-util.h
+++ b/src/parquet/util/compiler-util.h
@@ -36,4 +36,25 @@
 
 #define PREFETCH(addr) __builtin_prefetch(addr)
 
+//macros to disable padding
+//these macros are portable across different compilers and platforms
+//[https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flatbuffers.h#L1355]
+#if defined(_MSC_VER)
+  #define MANUALLY_ALIGNED_STRUCT(alignment) \
+    __pragma(pack(1)); \
+    struct __declspec(align(alignment))
+  #define STRUCT_END(name, size) \
+    __pragma(pack()); \
+    static_assert(sizeof(name) == size, "compiler breaks packing rules")
+#elif defined(__GNUC__) || defined(__clang__)
+  #define MANUALLY_ALIGNED_STRUCT(alignment) \
+    _Pragma("pack(1)") \
+    struct __attribute__((aligned(alignment)))
+  #define STRUCT_END(name, size) \
+    _Pragma("pack()") \
+    static_assert(sizeof(name) == size, "compiler breaks packing rules")
+#else
+  #error Unknown compiler, please define structure alignment macros
+#endif
+
 #endif // PARQUET_UTIL_COMPILER_UTIL_H