You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/08/19 02:15:44 UTC

[GitHub] [arrow] shanhuuang commented on a change in pull request #10627: PARQUET-490: [C++][Parquet] Basic support for reading DELTA_BINARY_PACKED data

shanhuuang commented on a change in pull request #10627:
URL: https://github.com/apache/arrow/pull/10627#discussion_r690890712



##########
File path: cpp/src/arrow/util/bit_util_test.cc
##########
@@ -1939,24 +1939,77 @@ TEST(BitUtil, RoundUpToPowerOf2) {
 #undef U64
 #undef S64
 
-static void TestZigZag(int32_t v) {
+static void TestZigZag(int32_t v, uint8_t* buffer_expect) {

Review comment:
       OK, thanks for your advice.

##########
File path: cpp/src/arrow/util/bpacking64_codegen.py
##########
@@ -0,0 +1,131 @@
+#!/bin/python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script is modified from its original version in GitHub. Original source:
+# https://github.com/lemire/FrameOfReference/blob/146948b6058a976bc7767262ad3a2ce201486b93/scripts/turbopacking64.py
+
+# Usage:
+#   python bpacking64_codegen.py > bpacking64_default.h
+
+def howmany(bit):
+    """ how many values are we going to pack? """
+    return 32
+
+
+def howmanywords(bit):
+    return (howmany(bit) * bit + 63)//64
+
+
+def howmanybytes(bit):
+    return (howmany(bit) * bit + 7)//8
+
+
+print('''// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This file was generated by script which is modified from its original version in GitHub.
+// Original source:
+// https://github.com/lemire/FrameOfReference/blob/master/scripts/turbopacking64.py
+// The original copyright notice follows.
+
+// This code is released under the
+// Apache License Version 2.0 http://www.apache.org/licenses/.
+// (c) Daniel Lemire 2013
+
+#pragma once
+
+#include "arrow/util/bit_util.h"
+#include "arrow/util/ubsan.h"
+
+namespace arrow {
+namespace internal {
+''')
+
+
+print("inline const uint8_t* unpack0_64(const uint8_t* in, uint64_t* out) {")
+print("  for(int k = 0; k < {0} ; k += 1) {{".format(howmany(0)))
+print("    out[k] = 0;")
+print("  }")
+print("  return in;")
+print("}")
+
+for bit in range(1, 65):
+    print("")
+    print(
+        "inline const uint8_t* unpack{0}_64(const uint8_t* in, uint64_t* out) {{".format(bit))
+
+    if(bit < 64):
+        print("  const uint64_t mask = {0}ULL;".format((1 << bit)-1))
+    maskstr = " & mask"
+    if (bit == 64):
+        maskstr = ""  # no need
+
+    for k in range(howmanywords(bit)-1):
+        print("  uint64_t w{0} = util::SafeLoadAs<uint64_t>(in);".format(k))
+        print("  w{0} = arrow::BitUtil::FromLittleEndian(w{0});".format(k))
+        print("  in += 8;".format(k))
+    k = howmanywords(bit) - 1
+    if (bit % 2 == 0):
+        print("  uint64_t w{0} = util::SafeLoadAs<uint64_t>(in);".format(k))
+        print("  w{0} = arrow::BitUtil::FromLittleEndian(w{0});".format(k))
+        print("  in += 8;".format(k))
+    else:
+        print("  uint64_t w{0} = util::SafeLoadAs<uint32_t>(in);".format(k))

Review comment:
       Yes. The number of values(including padding) in a miniblock is always a multiple of 32.

##########
File path: cpp/src/arrow/util/bit_stream_utils.h
##########
@@ -418,14 +450,59 @@ inline bool BitReader::GetVlqInt(uint32_t* v) {
 }
 
 inline bool BitWriter::PutZigZagVlqInt(int32_t v) {
-  auto u_v = ::arrow::util::SafeCopy<uint32_t>(v);
-  return PutVlqInt((u_v << 1) ^ (u_v >> 31));
+  uint32_t u_v = ::arrow::util::SafeCopy<uint32_t>(v);
+  u_v = (u_v << 1) ^ static_cast<uint32_t>(v >> 31);
+  return PutVlqInt(u_v);

Review comment:
       Original implementation of PutZigZagVlqInt was incorrect. The second shift – the (v >> 31) part – is an arithmetic shift according to: https://developers.google.com/protocol-buffers/docs/encoding#signed_integers

##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2107,73 +2105,99 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
   }
 
  private:
-  void InitBlock() {
-    // The number of values per block.
-    uint32_t block_size;
-    if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
-    if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
-    if (!decoder_.GetVlqInt(&values_current_block_)) {
+  void InitHeader() {
+    if (!decoder_.GetVlqInt(&values_per_block_)) ParquetException::EofException();
+    if (!decoder_.GetVlqInt(&mini_blocks_per_block_)) ParquetException::EofException();
+    if (!decoder_.GetVlqInt(&total_value_count_)) {
       ParquetException::EofException();
     }
     if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
 
-    delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
-    uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
+    delta_bit_widths_ = AllocateBuffer(pool_, mini_blocks_per_block_);
+    values_per_mini_block_ = values_per_block_ / mini_blocks_per_block_;
+    if (values_per_mini_block_ % 8 != 0) {
+      throw ParquetException("miniBlockSize must be multiple of 8, but it's " +

Review comment:
       I've read the code of [DeltaBinaryPackingValuesWriter](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java#L82) in parquet-mr. The parameters are always DEFAULT_NUM_BLOCK_VALUES(which is 128) and DEFAULT_NUM_MINIBLOCKS(which is 4). So if the file is written by parquet-mr, the values_per_mini_block_ here is always 32.
   But I'm not quiet sure if it should be 32 because the code [here](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingConfig.java#L41) is 8, which is dependent on [DeltaBinaryPackingValuesReader](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java#L63) 

##########
File path: cpp/src/parquet/encoding.cc
##########
@@ -2107,73 +2105,99 @@ class DeltaBitPackDecoder : public DecoderImpl, virtual public TypedDecoder<DTyp
   }
 
  private:
-  void InitBlock() {
-    // The number of values per block.
-    uint32_t block_size;
-    if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
-    if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
-    if (!decoder_.GetVlqInt(&values_current_block_)) {
+  void InitHeader() {
+    if (!decoder_.GetVlqInt(&values_per_block_)) ParquetException::EofException();
+    if (!decoder_.GetVlqInt(&mini_blocks_per_block_)) ParquetException::EofException();
+    if (!decoder_.GetVlqInt(&total_value_count_)) {
       ParquetException::EofException();
     }
     if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
 
-    delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
-    uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
+    delta_bit_widths_ = AllocateBuffer(pool_, mini_blocks_per_block_);
+    values_per_mini_block_ = values_per_block_ / mini_blocks_per_block_;
+    if (values_per_mini_block_ % 8 != 0) {
+      throw ParquetException("miniBlockSize must be multiple of 8, but it's " +
+                             std::to_string(values_per_mini_block_));
+    }
+
+    mini_block_idx_ = mini_blocks_per_block_;
+    values_current_mini_block_ = 0;
+  }
 
+  void InitBlock() {
     if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
-    for (uint32_t i = 0; i < num_mini_blocks_; ++i) {
+
+    // readBitWidthsForMiniBlocks
+    uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
+    for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) {
       if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
         ParquetException::EofException();
       }
     }
-    values_per_mini_block_ = block_size / num_mini_blocks_;
     mini_block_idx_ = 0;
     delta_bit_width_ = bit_width_data[0];
     values_current_mini_block_ = values_per_mini_block_;
   }
 
-  template <typename T>
   int GetInternal(T* buffer, int max_values) {
     max_values = std::min(max_values, this->num_values_);
-    const uint8_t* bit_width_data = delta_bit_widths_->data();
-    for (int i = 0; i < max_values; ++i) {
+    DCHECK_LE(static_cast<uint32_t>(max_values), total_value_count_);
+    int i = 0;
+    while (i < max_values) {
       if (ARROW_PREDICT_FALSE(values_current_mini_block_ == 0)) {
         ++mini_block_idx_;
-        if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) {
-          delta_bit_width_ = bit_width_data[mini_block_idx_];
+        if (mini_block_idx_ < mini_blocks_per_block_) {
+          delta_bit_width_ = delta_bit_widths_->data()[mini_block_idx_];
           values_current_mini_block_ = values_per_mini_block_;
         } else {
+          // mini_block_idx_ > mini_blocks_per_block_ only if last_value_ is stored in
+          // header
+          if (ARROW_PREDICT_FALSE(mini_block_idx_ > mini_blocks_per_block_)) {

Review comment:
       Good advice! I added a boolean member named "block_initialized_" to indicate whether function "InitBlock" has been called.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org