You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/16 21:26:00 UTC
[1/3] incubator-impala git commit: Remove unused/defunct Maven
repositories.
Repository: incubator-impala
Updated Branches:
refs/heads/master 6769220e2 -> ae116b5bf
Remove unused/defunct Maven repositories.
Removes three Maven repositories. davidtrott and codehaus both don't
exist any more, so they're not doing anyone any good. (We had previously
cleaned up Codehaus in IMPALA-5224, but a reference was resurrected.)
The libphonenumber repo was simply misconfigured: the library exists in
Maven central in the "normal" place, and a subdirectory repo is
unnecessary.
To test this, I ran "buildall" after removing ~/.m2/ on my machine.
Change-Id: I79eb6c483561726c7cbaf86874001f1979128720
Reviewed-on: http://gerrit.cloudera.org:8080/8497
Tested-by: Impala Public Jenkins
Reviewed-by: Alex Behm <al...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/155bb776
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/155bb776
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/155bb776
Branch: refs/heads/master
Commit: 155bb7764989c45f8274aac3196852a830402375
Parents: 6769220
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Mon Nov 6 16:10:58 2017 -0800
Committer: Alex Behm <al...@cloudera.com>
Committed: Thu Nov 16 15:51:48 2017 +0000
----------------------------------------------------------------------
common/yarn-extras/pom.xml | 8 --------
fe/pom.xml | 11 -----------
tests/test-hive-udfs/pom.xml | 5 -----
3 files changed, 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/155bb776/common/yarn-extras/pom.xml
----------------------------------------------------------------------
diff --git a/common/yarn-extras/pom.xml b/common/yarn-extras/pom.xml
index 8f3ba4d..8b34174 100644
--- a/common/yarn-extras/pom.xml
+++ b/common/yarn-extras/pom.xml
@@ -71,14 +71,6 @@
<enabled>false</enabled>
</snapshots>
</repository>
-
- <repository>
- <id>Codehaus repository</id>
- <url>http://repository.codehaus.org/</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
</repositories>
<dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/155bb776/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index a0330d7..85b16aa 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -743,17 +743,6 @@ under the License.
</snapshots>
</pluginRepository>
- <pluginRepository>
- <id>dtrott</id>
- <url>http://maven.davidtrott.com/repository</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </pluginRepository>
-
<!-- This is needed for the cup maven plugin. TODO add the plugin to our maven repo -->
<pluginRepository>
<id>sonatype-nexus-snapshots</id>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/155bb776/tests/test-hive-udfs/pom.xml
----------------------------------------------------------------------
diff --git a/tests/test-hive-udfs/pom.xml b/tests/test-hive-udfs/pom.xml
index 7f18c7b..17979ed 100644
--- a/tests/test-hive-udfs/pom.xml
+++ b/tests/test-hive-udfs/pom.xml
@@ -99,10 +99,5 @@ under the License.
<enabled>false</enabled>
</snapshots>
</repository>
- <repository>
- <id>com.google.libphonenumber</id>
- <url>http://repo1.maven.org/maven2/com/googlecode/libphonenumber</url>
- <name>Google phone number library</name>
- </repository>
</repositories>
</project>
[2/3] incubator-impala git commit: IMPALA-4177,
IMPALA-6039: batched bit reading and rle decoding
Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index d5b0d01..3c83c23 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -49,6 +49,7 @@ using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift;
using namespace parquet;
+using impala::RleBatchDecoder;
using std::min;
using impala::PARQUET_VERSION_NUMBER;
@@ -119,15 +120,6 @@ string GetSchema(const FileMetaData& md) {
return ss.str();
}
-// Inherit from RleDecoder to get access to repeat_count_, which is protected.
-class ParquetLevelReader : public impala::RleDecoder {
- public:
- ParquetLevelReader(uint8_t* buffer, int buffer_len, int bit_width)
- : RleDecoder(buffer, buffer_len, bit_width) {}
-
- uint32_t repeat_count() const { return repeat_count_; }
-};
-
// Performs sanity checking on the contents of data pages, to ensure that:
// - Compressed pages can be uncompressed successfully.
// - The number of def levels matches num_values in the page header when using RLE.
@@ -163,18 +155,19 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_
// Parquet data pages always start with the encoded definition level data, and
// RLE sections in Parquet always start with a 4 byte length followed by the data.
int num_def_level_bytes = *reinterpret_cast<const int32_t*>(data);
- ParquetLevelReader def_levels(const_cast<uint8_t*>(data) + sizeof(int32_t),
+ RleBatchDecoder<uint8_t> def_levels(const_cast<uint8_t*>(data) + sizeof(int32_t),
num_def_level_bytes, sizeof(uint8_t));
uint8_t level;
for (int i = 0; i < header.data_page_header.num_values; ++i) {
- if (!def_levels.Get(&level)) {
+ if (!def_levels.GetSingleValue(&level)) {
cerr << "Error: Decoding of def levels failed.\n";
exit(1);
}
- if (i + def_levels.repeat_count() + 1 > header.data_page_header.num_values) {
- cerr << "Error: More def levels encoded (" << (i + def_levels.repeat_count() + 1)
- << ") than num_values (" << header.data_page_header.num_values << ").\n";
+ if (i + def_levels.NextNumRepeats() + 1 > header.data_page_header.num_values) {
+ cerr << "Error: More def levels encoded ("
+ << (i + def_levels.NextNumRepeats() + 1) << ") than num_values ("
+ << header.data_page_header.num_values << ").\n";
exit(1);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h
index 2e2dd7f..861bf8e 100644
--- a/be/src/util/rle-encoding.h
+++ b/be/src/util/rle-encoding.h
@@ -76,49 +76,116 @@ namespace impala {
/// (total 26 bytes, 1 byte overhead)
//
-/// Decoder class for RLE encoded data.
-class RleDecoder {
+/// RLE decoder with a batch-oriented interface that enables fast decoding.
+/// Users of this class must first initialize the class to point to a buffer of
+/// RLE-encoded data, passed into the constructor or Reset(). Then they can
+/// decode data by checking NextNumRepeats()/NextNumLiterals() to see if the
+/// next run is a repeated or literal run, then calling GetRepeatedValue()
+/// or GetLiteralValues() respectively to read the values.
+///
+/// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
+/// Other decoding errors are signalled by functions returning false. If an
+/// error is encountered then it is not valid to read any more data until
+/// Reset() is called.
+template <typename T>
+class RleBatchDecoder {
public:
- /// Create a decoder object. buffer/buffer_len is the decoded data.
- /// bit_width is the width of each value (before encoding).
- RleDecoder(uint8_t* buffer, int buffer_len, int bit_width)
- : bit_reader_(buffer, buffer_len),
- bit_width_(bit_width),
- current_value_(0),
- repeat_count_(0),
- literal_count_(0) {
- DCHECK_GE(bit_width_, 0);
- DCHECK_LE(bit_width_, BitReader::MAX_BITWIDTH);
+ RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
+ Reset(buffer, buffer_len, bit_width);
}
+ RleBatchDecoder() : bit_width_(-1) {}
+
+ /// Reset the decoder to read from a new buffer.
+ void Reset(uint8_t* buffer, int buffer_len, int bit_width);
+
+ /// Return the size of the current repeated run. Returns zero if the current run is
+ /// a literal run or if no more runs can be read from the input.
+ int32_t NextNumRepeats();
+
+ /// Get the value of the current repeated run and consume the given number of repeats.
+ /// Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
+ /// be greater than the remaining number of repeats in the run.
+ T GetRepeatedValue(int32_t num_repeats_to_consume);
+
+ /// Return the size of the current literal run. Returns zero if the current run is
+ /// a repeated run or if no more runs can be read from the input.
+ int32_t NextNumLiterals();
+
+ /// Consume 'num_literals_to_consume' literals from the current literal run,
+ /// copying the values to 'values'. 'num_literals_to_consume' must be <=
+ /// NextNumLiterals(). Returns true if the requested number of literals were
+ /// successfully read or false if an error was encountered, e.g. the input was
+ /// truncated.
+ bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
+
+ /// Consume 'num_literals_to_consume' literals from the current literal run,
+ /// decoding them using 'dict' and outputting them to 'values'.
+ /// 'num_literals_to_consume' must be <= NextNumLiterals(). Returns true if
+ /// the requested number of literals were successfully read or false if an error
+ /// was encountered, e.g. the input was truncated or the value was not present
+ /// in the dictionary. Errors can only be recovered from by calling Reset()
+ /// to read from a new buffer.
+ template <typename OutType>
+ bool DecodeLiteralValues(int32_t num_literals_to_consume, OutType* dict,
+ int64_t dict_len, OutType* values) WARN_UNUSED_RESULT;
+
+ /// Convenience method to get the next value. Not efficient. Returns true on success
+ /// or false if no more values can be read from the input or an error was encountered
+ /// decoding the values.
+ bool GetSingleValue(T* val) WARN_UNUSED_RESULT;
- RleDecoder() : bit_width_(-1) {}
+ private:
+ BatchedBitReader bit_reader_;
- void Reset(uint8_t* buffer, int buffer_len, int bit_width) {
- DCHECK_GE(bit_width, 0);
- DCHECK_LE(bit_width, BitReader::MAX_BITWIDTH);
- bit_reader_.Reset(buffer, buffer_len);
- bit_width_ = bit_width;
- current_value_ = 0;
- repeat_count_ = 0;
- literal_count_ = 0;
- }
+ /// Number of bits needed to encode the value. Must be between 0 and 64.
+ int bit_width_;
- /// Gets the next value. Returns false if there are no more.
- template<typename T>
- bool Get(T* val);
+ /// If a repeated run, the number of repeats remaining in the current run to be read.
+ /// If the current run is a literal run, this is 0.
+ int32_t repeat_count_;
- protected:
- /// Fills literal_count_ and repeat_count_ with next values. Returns false if there
- /// are no more.
- template<typename T>
- bool NextCounts();
+ /// If a literal run, the number of literals remaining in the current run to be read.
+ /// If the current run is a repeated run, this is 0.
+ int32_t literal_count_;
- BitReader bit_reader_;
- /// Number of bits needed to encode the value. Must be between 0 and 64.
- int bit_width_;
- uint64_t current_value_;
- uint32_t repeat_count_;
- uint32_t literal_count_;
+ /// If a repeated run, the current repeated value.
+ T repeated_value_;
+
+ /// Size of buffer for literal values. Large enough to decode a full batch of 32
+ /// literals. The buffer is needed to allow clients to read in batches that are not
+ /// multiples of 32.
+ static constexpr int LITERAL_BUFFER_LEN = 32;
+
+ /// Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the
+ /// position of the next literal to be read from the buffer.
+ T literal_buffer_[LITERAL_BUFFER_LEN];
+ int num_buffered_literals_;
+ int literal_buffer_pos_;
+
+ /// Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
+ /// Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
+ /// or repeated run, or leaves both at 0 if no more values can be read (either because
+ /// the end of the input was reached or an error was encountered decoding).
+ void NextCounts();
+
+ /// Fill the literal buffer. Invalid to call if there are already buffered literals.
+ /// Return false if the input was truncated. This does not advance 'literal_count_'.
+ bool FillLiteralBuffer() WARN_UNUSED_RESULT;
+
+ bool HaveBufferedLiterals() const {
+ return literal_buffer_pos_ < num_buffered_literals_;
+ }
+
+ /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
+ /// 'literal_count_'. Returns the number of literals outputted.
+ int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
+
+ /// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
+ /// 'literal_count_'. Returns the number of literals outputted or 0 if a
+ /// decoding error is encountered.
+ template <typename OutType>
+ int32_t DecodeBufferedLiterals(
+ int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values);
};
/// Class to incrementally build the rle data. This class does not allocate any memory.
@@ -153,7 +220,8 @@ class RleEncoder {
int max_literal_run_size = 1 +
BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8);
/// Up to MAX_VLQ_BYTE_LEN indicator and a single 'bit_width' value.
- int max_repeated_run_size = BitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8);
+ int max_repeated_run_size =
+ BatchedBitReader::MAX_VLQ_BYTE_LEN + BitUtil::Ceil(bit_width, 8);
return std::max(max_literal_run_size, max_repeated_run_size);
}
@@ -167,7 +235,7 @@ class RleEncoder {
/// Encode value. Returns true if the value fits in buffer, false otherwise.
/// This value must be representable with bit_width_ bits.
- bool Put(uint64_t value);
+ bool Put(uint64_t value) WARN_UNUSED_RESULT;
/// Flushes any pending values to the underlying buffer.
/// Returns the total number of bytes written
@@ -245,53 +313,6 @@ class RleEncoder {
uint8_t* literal_indicator_byte_;
};
-// Force inlining - this is used in perf-critical loops in Parquet and GCC often
-// doesn't inline it in cases where it's beneficial.
-template <typename T>
-ALWAYS_INLINE inline bool RleDecoder::Get(T* val) {
- DCHECK_GE(bit_width_, 0);
- // Profiling has shown that the quality and performance of the generated code is very
- // sensitive to the exact shape of this check. For example, the version below performs
- // significantly better than UNLIKELY(literal_count_ == 0 && repeat_count_ == 0)
- if (repeat_count_ == 0) {
- if (literal_count_ == 0) {
- if (!NextCounts<T>()) return false;
- }
- }
-
- if (LIKELY(repeat_count_ > 0)) {
- *val = current_value_;
- --repeat_count_;
- } else {
- DCHECK_GT(literal_count_, 0);
- if (UNLIKELY(!bit_reader_.GetValue(bit_width_, val))) return false;
- --literal_count_;
- }
-
- return true;
-}
-
-template<typename T>
-bool RleDecoder::NextCounts() {
- // Read the next run's indicator int, it could be a literal or repeated run.
- // The int is encoded as a vlq-encoded value.
- int32_t indicator_value = 0;
- if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return false;
-
- // lsb indicates if it is a literal run or repeated run
- bool is_literal = indicator_value & 1;
- if (is_literal) {
- literal_count_ = (indicator_value >> 1) * 8;
- if (UNLIKELY(literal_count_ == 0)) return false;
- } else {
- repeat_count_ = indicator_value >> 1;
- bool result = bit_reader_.GetAligned<T>(
- BitUtil::Ceil(bit_width_, 8), reinterpret_cast<T*>(¤t_value_));
- if (UNLIKELY(!result || repeat_count_ == 0)) return false;
- }
- return true;
-}
-
/// This function buffers input values 8 at a time. After seeing all 8 values,
/// it decides whether they should be encoded as a literal or repeated run.
inline bool RleEncoder::Put(uint64_t value) {
@@ -444,5 +465,197 @@ inline void RleEncoder::Clear() {
bit_writer_.Clear();
}
+template <typename T>
+inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
+ DCHECK_GE(bit_width, 0);
+ DCHECK_LE(bit_width, BatchedBitReader::MAX_BITWIDTH);
+ bit_reader_.Reset(buffer, buffer_len);
+ bit_width_ = bit_width;
+ repeat_count_ = 0;
+ literal_count_ = 0;
+ num_buffered_literals_ = 0;
+ literal_buffer_pos_ = 0;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumRepeats() {
+ if (repeat_count_ > 0) return repeat_count_;
+ if (literal_count_ == 0) NextCounts();
+ return repeat_count_;
+}
+
+template <typename T>
+inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
+ DCHECK_GT(num_repeats_to_consume, 0);
+ DCHECK_GE(repeat_count_, num_repeats_to_consume);
+ repeat_count_ -= num_repeats_to_consume;
+ return repeated_value_;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::NextNumLiterals() {
+ if (literal_count_ > 0) return literal_count_;
+ if (repeat_count_ == 0) NextCounts();
+ return literal_count_;
}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::GetLiteralValues(
+ int32_t num_literals_to_consume, T* values) {
+ DCHECK_GE(num_literals_to_consume, 0);
+ DCHECK_GE(literal_count_, num_literals_to_consume);
+ int32_t num_consumed = 0;
+ // Copy any buffered literals left over from previous calls.
+ if (HaveBufferedLiterals()) {
+ num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
+ }
+
+ int32_t num_remaining = num_literals_to_consume - num_consumed;
+ // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
+ // Need to round to a batch of 32 if the caller is consuming only part of the current
+ // run avoid ending on a non-byte boundary.
+ int32_t num_to_bypass = std::min<int32_t>(literal_count_,
+ BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+ if (num_to_bypass > 0) {
+ int num_read =
+ bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed);
+ // If we couldn't read the expected number, that means the input was truncated.
+ if (num_read < num_to_bypass) return false;
+ literal_count_ -= num_to_bypass;
+ num_consumed += num_to_bypass;
+ num_remaining = num_literals_to_consume - num_consumed;
+ }
+
+ if (num_remaining > 0) {
+ // We weren't able to copy all the literals requested directly from the input.
+ // Buffer literals and copy over the requested number.
+ if (UNLIKELY(!FillLiteralBuffer())) return false;
+ int32_t num_copied = OutputBufferedLiterals(num_remaining, values + num_consumed);
+ DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals";
+ }
+ return true;
+}
+
+template <typename T>
+template <typename OutType>
+inline bool RleBatchDecoder<T>::DecodeLiteralValues(
+ int32_t num_literals_to_consume, OutType* dict, int64_t dict_len, OutType* values) {
+ DCHECK_GE(num_literals_to_consume, 0);
+ DCHECK_GE(literal_count_, num_literals_to_consume);
+ int32_t num_consumed = 0;
+ // Decode any buffered literals left over from previous calls.
+ if (HaveBufferedLiterals()) {
+ num_consumed =
+ DecodeBufferedLiterals(num_literals_to_consume, dict, dict_len, values);
+ if (UNLIKELY(num_consumed == 0)) return false;
+ }
+
+ int32_t num_remaining = num_literals_to_consume - num_consumed;
+ // Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
+ // Need to round to a batch of 32 if the caller is consuming only part of the current
+ // run avoid ending on a non-byte boundery.
+ int32_t num_to_bypass =
+ std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
+ if (num_to_bypass > 0) {
+ int num_read = bit_reader_.UnpackAndDecodeBatch(
+ bit_width_, dict, dict_len, num_to_bypass, values + num_consumed);
+ // If we couldn't read the expected number, that means the input was truncated.
+ if (num_read < num_to_bypass) return false;
+ literal_count_ -= num_to_bypass;
+ num_consumed += num_to_bypass;
+ num_remaining = num_literals_to_consume - num_consumed;
+ }
+
+ if (num_remaining > 0) {
+ // We weren't able to copy all the literals requested directly from the input.
+ // Buffer literals and copy over the requested number.
+ if (UNLIKELY(!FillLiteralBuffer())) return false;
+ int32_t num_copied =
+ DecodeBufferedLiterals(num_remaining, dict, dict_len, values + num_consumed);
+ if (UNLIKELY(num_copied == 0)) return false;
+ DCHECK_EQ(num_copied, num_remaining) << "Should have buffered enough literals";
+ }
+ return true;
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::GetSingleValue(T* val) {
+ if (NextNumRepeats() > 0) {
+ DCHECK_EQ(0, NextNumLiterals());
+ *val = GetRepeatedValue(1);
+ return true;
+ }
+ if (NextNumLiterals() > 0) {
+ DCHECK_EQ(0, NextNumRepeats());
+ return GetLiteralValues(1, val);
+ }
+ return false;
+}
+
+template <typename T>
+inline void RleBatchDecoder<T>::NextCounts() {
+ DCHECK_EQ(0, literal_count_);
+ DCHECK_EQ(0, repeat_count_);
+ // Read the next run's indicator int, it could be a literal or repeated run.
+ // The int is encoded as a vlq-encoded value.
+ int32_t indicator_value = 0;
+ if (UNLIKELY(!bit_reader_.GetVlqInt(&indicator_value))) return;
+
+ // lsb indicates if it is a literal run or repeated run
+ bool is_literal = indicator_value & 1;
+ if (is_literal) {
+ literal_count_ = (indicator_value >> 1) * 8;
+ } else {
+ int32_t repeat_count = indicator_value >> 1;
+ if (UNLIKELY(repeat_count == 0)) return;
+ bool result =
+ bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
+ if (UNLIKELY(!result)) return;
+ repeat_count_ = repeat_count;
+ }
+}
+
+template <typename T>
+inline bool RleBatchDecoder<T>::FillLiteralBuffer() {
+ DCHECK(!HaveBufferedLiterals());
+ int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_);
+ num_buffered_literals_ =
+ bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_);
+ // If we couldn't read the expected number, that means the input was truncated.
+ if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
+ literal_buffer_pos_ = 0;
+ return true;
+}
+
+template <typename T>
+inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(
+ int32_t max_to_output, T* values) {
+ int32_t num_to_output =
+ std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
+ memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output);
+ literal_buffer_pos_ += num_to_output;
+ literal_count_ -= num_to_output;
+ return num_to_output;
+}
+
+template <typename T>
+template <typename OutType>
+inline int32_t RleBatchDecoder<T>::DecodeBufferedLiterals(
+ int32_t max_to_output, OutType* dict, int64_t dict_len, OutType* values) {
+ int32_t num_to_output =
+ std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
+ for (int32_t i = 0; i < num_to_output; ++i) {
+ T idx = literal_buffer_[literal_buffer_pos_ + i];
+ if (UNLIKELY(idx < 0 || idx >= dict_len)) return 0;
+ memcpy(&values[i], &dict[idx], sizeof(OutType));
+ }
+ literal_buffer_pos_ += num_to_output;
+ literal_count_ -= num_to_output;
+ return num_to_output;
+}
+
+template <typename T>
+constexpr int RleBatchDecoder<T>::LITERAL_BUFFER_LEN;
+}
+
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/rle-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc
index 0501d71..b8a1d94 100644
--- a/be/src/util/rle-test.cc
+++ b/be/src/util/rle-test.cc
@@ -30,7 +30,7 @@
namespace impala {
-const int MAX_WIDTH = BitReader::MAX_BITWIDTH;
+const int MAX_WIDTH = BatchedBitReader::MAX_BITWIDTH;
TEST(BitArray, TestBool) {
const int len = 8;
@@ -69,29 +69,24 @@ TEST(BitArray, TestBool) {
EXPECT_EQ((int)buffer[1], BOOST_BINARY(1 1 0 0 1 1 0 0));
// Use the reader and validate
- BitReader reader(buffer, len);
+ BatchedBitReader reader(buffer, len);
+
// Ensure it returns the same results after Reset().
for (int trial = 0; trial < 2; ++trial) {
- for (int i = 0; i < 8; ++i) {
- bool val = false;
- bool result = reader.GetValue(1, &val);
- EXPECT_TRUE(result);
- EXPECT_EQ(val, i % 2);
- }
+ bool batch_vals[16];
+ EXPECT_EQ(16, reader.UnpackBatch(1, 16, batch_vals));
+ for (int i = 0; i < 8; ++i) EXPECT_EQ(batch_vals[i], i % 2);
for (int i = 0; i < 8; ++i) {
- bool val = false;
- bool result = reader.GetValue(1, &val);
- EXPECT_TRUE(result);
switch (i) {
case 0:
case 1:
case 4:
case 5:
- EXPECT_EQ(val, false);
+ EXPECT_EQ(batch_vals[8 + i], false);
break;
default:
- EXPECT_EQ(val, true);
+ EXPECT_EQ(batch_vals[8 + i], true);
break;
}
}
@@ -113,17 +108,30 @@ void TestBitArrayValues(int bit_width, int num_vals) {
writer.Flush();
EXPECT_EQ(writer.bytes_written(), len);
- BitReader reader(buffer, len);
+ BatchedBitReader reader(buffer, len);
+ BatchedBitReader reader2(reader); // Test copy constructor.
// Ensure it returns the same results after Reset().
for (int trial = 0; trial < 2; ++trial) {
+ // Unpack all values at once with one batched reader and in small batches with the
+ // other batched reader.
+ vector<int64_t> batch_vals(num_vals);
+ const int BATCH_SIZE = 32;
+ vector<int64_t> batch_vals2(BATCH_SIZE);
+ EXPECT_EQ(num_vals,
+ reader.UnpackBatch(bit_width, num_vals, batch_vals.data()));
for (int i = 0; i < num_vals; ++i) {
- int64_t val;
- bool result = reader.GetValue(bit_width, &val);
- EXPECT_TRUE(result);
- EXPECT_EQ(val, i % mod);
+ if (i % BATCH_SIZE == 0) {
+ int num_to_unpack = min(BATCH_SIZE, num_vals - i);
+ EXPECT_EQ(num_to_unpack,
+ reader2.UnpackBatch(bit_width, num_to_unpack, batch_vals2.data()));
+ }
+ EXPECT_EQ(i % mod, batch_vals[i]);
+ EXPECT_EQ(i % mod, batch_vals2[i % BATCH_SIZE]);
}
EXPECT_EQ(reader.bytes_left(), 0);
+ EXPECT_EQ(reader2.bytes_left(), 0);
reader.Reset(buffer, len);
+ reader2.Reset(buffer, len);
}
}
@@ -137,45 +145,32 @@ TEST(BitArray, TestValues) {
}
}
-// Test some mixed values
-TEST(BitArray, TestMixed) {
- const int len = 1024;
- uint8_t buffer[len];
- bool parity = true;
-
- BitWriter writer(buffer, len);
- for (int i = 0; i < len; ++i) {
- bool result;
- if (i % 2 == 0) {
- result = writer.PutValue(parity, 1);
- parity = !parity;
- } else {
- result = writer.PutValue(i, 10);
- }
- EXPECT_TRUE(result);
- }
- writer.Flush();
-
- parity = true;
- BitReader reader(buffer, len);
- // Ensure it returns the same results after Reset().
- for (int trial = 0; trial < 2; ++trial) {
- for (int i = 0; i < len; ++i) {
- bool result;
- if (i % 2 == 0) {
- bool val;
- result = reader.GetValue(1, &val);
- EXPECT_EQ(val, parity);
- parity = !parity;
- } else {
- int val;
- result = reader.GetValue(10, &val);
- EXPECT_EQ(val, i);
+/// Get many values from a batch RLE decoder.
+template <typename T>
+static bool GetRleValues(RleBatchDecoder<T>* decoder, int num_vals, T* vals) {
+ int decoded = 0;
+ // Decode repeated and literal runs until we've filled the output.
+ while (decoded < num_vals) {
+ if (decoder->NextNumRepeats() > 0) {
+ EXPECT_EQ(0, decoder->NextNumLiterals());
+ int num_repeats_to_output =
+ min<int>(decoder->NextNumRepeats(), num_vals - decoded);
+ T repeated_val = decoder->GetRepeatedValue(num_repeats_to_output);
+ for (int i = 0; i < num_repeats_to_output; ++i) {
+ *vals = repeated_val;
+ ++vals;
}
- EXPECT_TRUE(result);
+ decoded += num_repeats_to_output;
+ continue;
}
- reader.Reset(buffer, len);
+ int num_literals_to_output =
+ min<int>(decoder->NextNumLiterals(), num_vals - decoded);
+ if (num_literals_to_output == 0) return false;
+ if (!decoder->GetLiteralValues(num_literals_to_output, vals)) return false;
+ decoded += num_literals_to_output;
+ vals += num_literals_to_output;
}
+ return true;
}
// Validates encoding of values by encoding and decoding them. If
@@ -203,16 +198,32 @@ void ValidateRle(const vector<int>& values, int bit_width,
}
// Verify read
- RleDecoder decoder(buffer, len, bit_width);
+ RleBatchDecoder<uint64_t> decoder(buffer, len, bit_width);
+ RleBatchDecoder<uint64_t> decoder2(buffer, len, bit_width);
// Ensure it returns the same results after Reset().
for (int trial = 0; trial < 2; ++trial) {
for (int i = 0; i < values.size(); ++i) {
uint64_t val;
- bool result = decoder.Get(&val);
- EXPECT_TRUE(result);
- EXPECT_EQ(values[i], val);
+ EXPECT_TRUE(decoder.GetSingleValue(&val));
+ EXPECT_EQ(values[i], val) << i;
+ }
+ // Unpack everything at once from the second batch decoder.
+ vector<uint64_t> decoded_values(values.size());
+ EXPECT_TRUE(GetRleValues(&decoder2, values.size(), decoded_values.data()));
+ for (int i = 0; i < values.size(); ++i) {
+ EXPECT_EQ(values[i], decoded_values[i]) << i;
}
decoder.Reset(buffer, len, bit_width);
+ decoder2.Reset(buffer, len, bit_width);
+ }
+}
+
+/// Basic test case for literal unpacking - two literals in a run.
+TEST(Rle, TwoLiteralRun) {
+ vector<int> values{1, 0};
+ ValidateRle(values, 1, nullptr, -1);
+ for (int width = 1; width <= MAX_WIDTH; ++width) {
+ ValidateRle(values, width, nullptr, -1);
}
}
@@ -287,16 +298,22 @@ TEST(Rle, BitWidthZeroRepeated) {
uint8_t buffer[1];
const int num_values = 15;
buffer[0] = num_values << 1; // repeated indicator byte
- RleDecoder decoder(buffer, sizeof(buffer), 0);
+ RleBatchDecoder<uint8_t> decoder(buffer, sizeof(buffer), 0);
// Ensure it returns the same results after Reset().
for (int trial = 0; trial < 2; ++trial) {
uint8_t val;
for (int i = 0; i < num_values; ++i) {
- bool result = decoder.Get(&val);
- EXPECT_TRUE(result);
- EXPECT_EQ(val, 0); // can only encode 0s with bit width 0
+ EXPECT_TRUE(decoder.GetSingleValue(&val));
+ EXPECT_EQ(val, 0);
}
- EXPECT_FALSE(decoder.Get(&val));
+ EXPECT_FALSE(decoder.GetSingleValue(&val));
+
+ // Test decoding all values in a batch.
+ decoder.Reset(buffer, sizeof(buffer), 0);
+ uint8_t decoded_values[num_values];
+ EXPECT_TRUE(GetRleValues(&decoder, num_values, decoded_values));
+ for (int i = 0; i < num_values; i++) EXPECT_EQ(0, decoded_values[i]) << i;
+ EXPECT_FALSE(decoder.GetSingleValue(&val));
decoder.Reset(buffer, sizeof(buffer), 0);
}
}
@@ -305,17 +322,23 @@ TEST(Rle, BitWidthZeroLiteral) {
uint8_t buffer[1];
const int num_groups = 4;
buffer[0] = num_groups << 1 | 1; // literal indicator byte
- RleDecoder decoder = RleDecoder(buffer, sizeof(buffer), 0);
+ RleBatchDecoder<uint8_t> decoder(buffer, sizeof(buffer), 0);
// Ensure it returns the same results after Reset().
for (int trial = 0; trial < 2; ++trial) {
const int num_values = num_groups * 8;
uint8_t val;
for (int i = 0; i < num_values; ++i) {
- bool result = decoder.Get(&val);
- EXPECT_TRUE(result);
+ EXPECT_TRUE(decoder.GetSingleValue(&val));
EXPECT_EQ(val, 0); // can only encode 0s with bit width 0
}
- EXPECT_FALSE(decoder.Get(&val));
+
+ // Test decoding the whole batch at once.
+ decoder.Reset(buffer, sizeof(buffer), 0);
+ uint8_t decoded_values[num_values];
+ EXPECT_TRUE(GetRleValues(&decoder, num_values, decoded_values));
+ for (int i = 0; i < num_values; ++i) EXPECT_EQ(0, decoded_values[i]);
+
+ EXPECT_FALSE(GetRleValues(&decoder, 1, decoded_values));
decoder.Reset(buffer, sizeof(buffer), 0);
}
}
@@ -402,20 +425,25 @@ TEST(BitRle, Overflow) {
EXPECT_LE(bytes_written, len);
EXPECT_GT(num_added, 0);
- RleDecoder decoder(buffer, bytes_written, bit_width);
+ RleBatchDecoder<uint32_t> decoder(buffer, bytes_written, bit_width);
// Ensure it returns the same results after Reset().
for (int trial = 0; trial < 2; ++trial) {
parity = true;
uint32_t v;
for (int i = 0; i < num_added; ++i) {
- bool result = decoder.Get(&v);
- EXPECT_TRUE(result);
+ EXPECT_TRUE(decoder.GetSingleValue(&v));
EXPECT_EQ(v, parity);
parity = !parity;
}
// Make sure we get false when reading past end a couple times.
- EXPECT_FALSE(decoder.Get(&v));
- EXPECT_FALSE(decoder.Get(&v));
+ EXPECT_FALSE(decoder.GetSingleValue(&v));
+ EXPECT_FALSE(decoder.GetSingleValue(&v));
+
+ decoder.Reset(buffer, bytes_written, bit_width);
+ uint32_t decoded_values[num_added];
+ EXPECT_TRUE(GetRleValues(&decoder, num_added, decoded_values));
+ for (int i = 0; i < num_added; ++i) EXPECT_EQ(i % 2 == 0, decoded_values[i]) << i;
+
decoder.Reset(buffer, bytes_written, bit_width);
}
}
@@ -426,21 +454,20 @@ TEST(BitRle, Overflow) {
TEST(Rle, ZeroLiteralOrRepeatCount) {
const int len = 1024;
uint8_t buffer[len];
- RleDecoder decoder(buffer, len, 0);
- uint64_t val;
-
+ RleBatchDecoder<uint64_t> decoder(buffer, len, 0);
// Test the RLE repeated values path.
memset(buffer, 0, len);
for (int i = 0; i < 10; ++i) {
- bool result = decoder.Get(&val);
- EXPECT_FALSE(result);
+ EXPECT_EQ(0, decoder.NextNumLiterals());
+ EXPECT_EQ(0, decoder.NextNumRepeats());
}
// Test the RLE literal values path
memset(buffer, 1, len);
+ decoder.Reset(buffer, len, 0);
for (int i = 0; i < 10; ++i) {
- bool result = decoder.Get(&val);
- EXPECT_FALSE(result);
+ EXPECT_EQ(0, decoder.NextNumLiterals());
+ EXPECT_EQ(0, decoder.NextNumRepeats());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index e1dc496..8b9db3a 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -123,3 +123,12 @@ Generated using parquet-mr and contents verified using parquet-tools-1.9.1.
Contains decimals stored as variable sized BYTE_ARRAY with both dictionary
and non-dictionary encoding respectively.
+alltypes_agg_bitpacked_def_levels.parquet:
+Generated by hacking Impala's Parquet writer to write out bitpacked def levels instead
+of the standard RLE-encoded levels. See
+https://github.com/timarmstrong/incubator-impala/tree/hack-bit-packed-levels. This
+is a single file containing all of the alltypesagg data, which includes a mix of
+null and non-null values. This is not actually a valid Parquet file because the
+bit-packed levels are written in the reverse order specified in the Parquet spec
+for BIT_PACKED. However, this is the order that Impala attempts to read the levels
+in - see IMPALA-3006.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/alltypes_agg_bitpacked_def_levels.parquet b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet
new file mode 100644
index 0000000..bd2d9d8
Binary files /dev/null and b/testdata/data/alltypes_agg_bitpacked_def_levels.parquet differ
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
new file mode 100644
index 0000000..d48c333
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-def-levels.test
@@ -0,0 +1,52 @@
+====
+---- QUERY
+# Verify that total counts of non-null values are correct.
+select count(id), count(tinyint_col), count(smallint_col), count(int_col),
+ count(bigint_col), count(float_col), count(double_col), count(date_string_col),
+ count(string_col), count(timestamp_col), count(year), count(month), count(day)
+from alltypesagg
+---- TYPES
+BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT,BIGINT
+---- RESULTS
+11000,9000,10800,10980,10980,10980,10980,11000,11000,11000,11000,11000,10000
+====
+---- QUERY
+# Spot-check a subset of values.
+select *
+from alltypesagg
+where year = 2010 and month = 1 and int_col is null or int_col % 1000 = 77
+order by id, year, month, day
+---- TYPES
+INT,BOOLEAN,TINYINT,SMALLINT,INT,BIGINT,FLOAT,DOUBLE,STRING,STRING,TIMESTAMP,INT,INT,INT
+---- RESULTS
+0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 00:00:00,2010,1,1
+0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 00:00:00,2010,1,NULL
+77,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/01/10','77',2010-01-01 01:17:29.260000000,2010,1,1
+1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 00:00:00,2010,1,2
+1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 00:00:00,2010,1,NULL
+1077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/02/10','77',2010-01-02 01:17:29.260000000,2010,1,2
+2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 00:00:00,2010,1,3
+2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 00:00:00,2010,1,NULL
+2077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/03/10','77',2010-01-03 01:17:29.260000000,2010,1,3
+3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 00:00:00,2010,1,4
+3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 00:00:00,2010,1,NULL
+3077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/04/10','77',2010-01-04 01:17:29.260000000,2010,1,4
+4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 00:00:00,2010,1,5
+4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 00:00:00,2010,1,NULL
+4077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/05/10','77',2010-01-05 01:17:29.260000000,2010,1,5
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,6
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,NULL
+5077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/06/10','77',2010-01-06 01:17:29.260000000,2010,1,6
+6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 00:00:00,2010,1,7
+6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 00:00:00,2010,1,NULL
+6077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/07/10','77',2010-01-07 01:17:29.260000000,2010,1,7
+7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 00:00:00,2010,1,8
+7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 00:00:00,2010,1,NULL
+7077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/08/10','77',2010-01-08 01:17:29.260000000,2010,1,8
+8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 00:00:00,2010,1,9
+8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 00:00:00,2010,1,NULL
+8077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/09/10','77',2010-01-09 01:17:29.260000000,2010,1,9
+9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 00:00:00,2010,1,10
+9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 00:00:00,2010,1,NULL
+9077,false,7,77,77,770,84.69999694824219,777.6999999999999,'01/10/10','77',2010-01-10 01:17:29.260000000,2010,1,10
+====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 17b9503..fe0577a 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -388,6 +388,22 @@ class TestParquet(ImpalaTestSuite):
self.run_test_case('QueryTest/parquet-corrupt-rle-counts-abort',
vector, unique_database)
+ def test_bitpacked_def_levels(self, vector, unique_database):
+ """Test that Impala can read a Parquet file with the deprecated bit-packed def
+ level encoding."""
+ self.client.execute(("""CREATE TABLE {0}.alltypesagg (
+ id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
+ int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
+ date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
+ year INT, month INT, day INT) STORED AS PARQUET""").format(unique_database))
+ alltypesagg_loc = get_fs_path(
+ "/test-warehouse/{0}.db/{1}".format(unique_database, "alltypesagg"))
+ check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+ "/testdata/data/alltypes_agg_bitpacked_def_levels.parquet", alltypesagg_loc])
+ self.client.execute("refresh {0}.alltypesagg".format(unique_database));
+
+ self.run_test_case('QueryTest/parquet-def-levels', vector, unique_database)
+
@SkipIfS3.hdfs_block_size
@SkipIfADLS.hdfs_block_size
@SkipIfIsilon.hdfs_block_size
[3/3] incubator-impala git commit: IMPALA-4177,
IMPALA-6039: batched bit reading and rle decoding
Posted by ta...@apache.org.
IMPALA-4177,IMPALA-6039: batched bit reading and rle decoding
Switch the decoders to using more batch-oriented interfaces. As an
intermediate step this doesn't make the interfaces of LevelDecoder
or DictDecoder batch-oriented, only the lower-level utility classes.
The next step would be to change those interfaces to be batch-oriented
and make according optimisations in parquet. This could deliver much
larger perf improvements than the current patch.
The high-level changes are.
* BitReader -> BatchedBitReader, which is built to unpack runs of 32
bit-packed values efficiently.
* RleDecoder -> RleBatchDecoder, which exposes the repeated and literal
runs to the caller and uses BatchedBitReader to unpack literal runs
efficiently.
* Dict decoding uses RleBatchDecoder to decode repeated runs efficiently
and uses the BitPacking utilities to unpack and encode in a single
step.
Also removes an older benchmark that isn't too interesting (since
the batch-oriented approach to encoding and decoding is so much
faster than the value-by-value approach).
Testing:
* Ran core tests.
* Updated unit tests to exercise new code.
* Added test coverage for the deprecated bit-packed level encoding to
that it still works (there was no coverage previously).
Perf:
Single-node benchmarks showed a few % performance gain. 16 node cluster
benchmarks only showed a gain for TPC-H nested.
Change-Id: I35de0cf80c86f501c4a39270afc8fb8111552ac6
Reviewed-on: http://gerrit.cloudera.org:8080/8267
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ae116b5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ae116b5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ae116b5b
Branch: refs/heads/master
Commit: ae116b5bf7b8b2514d7a8655d9a6666ad3fd36dd
Parents: 155bb77
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Oct 9 17:09:39 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Nov 16 21:23:09 2017 +0000
----------------------------------------------------------------------
be/src/benchmarks/CMakeLists.txt | 1 -
be/src/benchmarks/bit-packing-benchmark.cc | 120 ++++++
be/src/benchmarks/rle-benchmark.cc | 244 ------------
be/src/exec/parquet-column-readers.cc | 174 +++++----
be/src/exec/parquet-column-readers.h | 90 +++--
be/src/experiments/bit-stream-utils.8byte.h | 137 -------
.../experiments/bit-stream-utils.8byte.inline.h | 145 -------
be/src/util/bit-packing.h | 61 ++-
be/src/util/bit-packing.inline.h | 210 ++++++++--
be/src/util/bit-stream-utils.h | 77 ++--
be/src/util/bit-stream-utils.inline.h | 106 +++--
be/src/util/dict-encoding.h | 94 +++--
be/src/util/dict-test.cc | 96 ++++-
be/src/util/parquet-reader.cc | 21 +-
be/src/util/rle-encoding.h | 383 +++++++++++++++----
be/src/util/rle-test.cc | 185 +++++----
testdata/data/README | 9 +
.../alltypes_agg_bitpacked_def_levels.parquet | Bin 0 -> 208380 bytes
.../queries/QueryTest/parquet-def-levels.test | 52 +++
tests/query_test/test_scanners.py | 16 +
20 files changed, 1253 insertions(+), 968 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt
index 1d67d45..a569a66 100644
--- a/be/src/benchmarks/CMakeLists.txt
+++ b/be/src/benchmarks/CMakeLists.txt
@@ -48,7 +48,6 @@ ADD_BE_BENCHMARK(multiint-benchmark)
ADD_BE_BENCHMARK(network-perf-benchmark)
ADD_BE_BENCHMARK(overflow-benchmark)
ADD_BE_BENCHMARK(parse-timestamp-benchmark)
-ADD_BE_BENCHMARK(rle-benchmark)
ADD_BE_BENCHMARK(row-batch-serialize-benchmark)
ADD_BE_BENCHMARK(scheduler-benchmark)
ADD_BE_BENCHMARK(status-benchmark)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/bit-packing-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/bit-packing-benchmark.cc b/be/src/benchmarks/bit-packing-benchmark.cc
index 955c0fb..7769182 100644
--- a/be/src/benchmarks/bit-packing-benchmark.cc
+++ b/be/src/benchmarks/bit-packing-benchmark.cc
@@ -285,6 +285,126 @@ struct BenchmarkParams {
int64_t data_len;
};
+/// Legacy value-at-a-time implementation of bit unpacking. Retained here for
+/// purposes of comparison in the benchmark.
+class BitReader {
+ public:
+ /// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'.
+ /// Does not take ownership of the buffer.
+ BitReader(const uint8_t* buffer, int buffer_len) { Reset(buffer, buffer_len); }
+
+ BitReader() : buffer_(NULL), max_bytes_(0) {}
+
+ // The implicit copy constructor is left defined. If a BitReader is copied, the
+ // two copies do not share any state. Invoking functions on either copy continues
+ // reading from the current read position without modifying the state of the other
+ // copy.
+
+ /// Resets the read to start reading from the start of 'buffer'. The buffer's
+ /// length is 'buffer_len'. Does not take ownership of the buffer.
+ void Reset(const uint8_t* buffer, int buffer_len) {
+ buffer_ = buffer;
+ max_bytes_ = buffer_len;
+ byte_offset_ = 0;
+ bit_offset_ = 0;
+ int num_bytes = std::min(8, max_bytes_);
+ memcpy(&buffered_values_, buffer_, num_bytes);
+ }
+
+ /// Gets the next value from the buffer. Returns true if 'v' could be read or false if
+ /// there are not enough bytes left. num_bits must be <= 32.
+ template<typename T>
+ bool GetValue(int num_bits, T* v);
+
+ /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a
+ /// little-endian native type and big enough to store 'num_bytes'. The value is assumed
+ /// to be byte-aligned so the stream will be advanced to the start of the next byte
+ /// before 'v' is read. Returns false if there are not enough bytes left.
+ template<typename T>
+ bool GetBytes(int num_bytes, T* v);
+
+ /// Returns the number of bytes left in the stream, not including the current byte (i.e.,
+ /// there may be an additional fraction of a byte).
+ int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); }
+
+ /// Maximum supported bitwidth for reader.
+ static const int MAX_BITWIDTH = 32;
+
+ private:
+ const uint8_t* buffer_;
+ int max_bytes_;
+
+ /// Bytes are memcpy'd from buffer_ and values are read from this variable. This is
+ /// faster than reading values byte by byte directly from buffer_.
+ uint64_t buffered_values_;
+
+ int byte_offset_; // Offset in buffer_
+ int bit_offset_; // Offset in buffered_values_
+};
+
+template <typename T>
+bool BitReader::GetValue(int num_bits, T* v) {
+ DCHECK(num_bits == 0 || buffer_ != NULL);
+ // TODO: revisit this limit if necessary
+ DCHECK_LE(num_bits, MAX_BITWIDTH);
+ DCHECK_LE(num_bits, sizeof(T) * 8);
+
+ // First do a cheap check to see if we may read past the end of the stream, using
+ // constant upper bounds for 'bit_offset_' and 'num_bits'.
+ if (UNLIKELY(byte_offset_ + sizeof(buffered_values_) + MAX_BITWIDTH / 8 > max_bytes_)) {
+ // Now do the precise check.
+ if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) {
+ return false;
+ }
+ }
+
+ DCHECK_GE(bit_offset_, 0);
+ DCHECK_LE(bit_offset_, 64);
+ *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_;
+
+ bit_offset_ += num_bits;
+ if (bit_offset_ >= 64) {
+ byte_offset_ += 8;
+ bit_offset_ -= 64;
+
+ int bytes_remaining = max_bytes_ - byte_offset_;
+ if (LIKELY(bytes_remaining >= 8)) {
+ memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+ } else {
+ memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+ }
+
+ // Read bits of v that crossed into new buffered_values_
+ *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_)
+ << (num_bits - bit_offset_);
+ }
+ DCHECK_LE(bit_offset_, 64);
+ return true;
+}
+
+template<typename T>
+bool BitReader::GetBytes(int num_bytes, T* v) {
+ DCHECK_LE(num_bytes, sizeof(T));
+ int bytes_read = BitUtil::Ceil(bit_offset_, 8);
+ if (UNLIKELY(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false;
+
+ // Advance byte_offset to next unread byte and read num_bytes
+ byte_offset_ += bytes_read;
+ *v = 0; // Ensure unset bytes are initialized to zero.
+ memcpy(v, buffer_ + byte_offset_, num_bytes);
+ byte_offset_ += num_bytes;
+
+ // Reset buffered_values_
+ bit_offset_ = 0;
+ int bytes_remaining = max_bytes_ - byte_offset_;
+ if (LIKELY(bytes_remaining >= 8)) {
+ memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
+ } else {
+ memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
+ }
+ return true;
+}
+
/// Benchmark calling BitReader::GetValue() in a loop to unpack 32 * 'batch_size' values.
void BitReaderBenchmark(int batch_size, void* data) {
const BenchmarkParams* p = reinterpret_cast<BenchmarkParams*>(data);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/benchmarks/rle-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/rle-benchmark.cc b/be/src/benchmarks/rle-benchmark.cc
deleted file mode 100644
index 439c630..0000000
--- a/be/src/benchmarks/rle-benchmark.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <iostream>
-#include <sstream>
-
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "experiments/bit-stream-utils.8byte.inline.h"
-#include "util/benchmark.h"
-#include "util/bit-stream-utils.inline.h"
-#include "util/cpu-info.h"
-
-#include "common/names.h"
-
-// Benchmark to measure how quickly we can do bit encoding and decoding.
-
-// encode: Function Rate (iters/ms) Comparison
-// ----------------------------------------------------------------------
-// "BitWriter (8 byte) 1-Bit" 66.9 1X
-// "BitWriter 1-Bit" 107.3 1.604X
-// "BitWriter (8 byte) 2-Bit" 74.42 1X
-// "BitWriter 2-Bit" 105.6 1.419X
-// "BitWriter (8 byte) 3-Bit" 76.91 1X
-// "BitWriter 3-Bit" 104 1.353X
-// "BitWriter (8 byte) 4-Bit" 80.37 1X
-// "BitWriter 4-Bit" 102.7 1.278X
-// "BitWriter (8 byte) 5-Bit" 79.29 1X
-// "BitWriter 5-Bit" 101 1.274X
-// "BitWriter (8 byte) 6-Bit" 80.37 1X
-// "BitWriter 6-Bit" 99.28 1.235X
-// "BitWriter (8 byte) 7-Bit" 80.19 1X
-// "BitWriter 7-Bit" 98.09 1.223X
-// "BitWriter (8 byte) 8-Bit" 84.93 1X
-// "BitWriter 8-Bit" 97 1.142X
-// "BitWriter (8 byte) 9-Bit" 79.85 1X
-// "BitWriter 9-Bit" 95.09 1.191X
-// "BitWriter (8 byte) 10-Bit" 80.51 1X
-// "BitWriter 10-Bit" 94.17 1.17X
-// "BitWriter (8 byte) 11-Bit" 79.36 1X
-// "BitWriter 11-Bit" 93.2 1.174X
-// "BitWriter (8 byte) 12-Bit" 80.79 1X
-// "BitWriter 12-Bit" 92.09 1.14X
-// "BitWriter (8 byte) 13-Bit" 78.28 1X
-// "BitWriter 13-Bit" 90.83 1.16X
-// "BitWriter (8 byte) 14-Bit" 78.57 1X
-// "BitWriter 14-Bit" 89.71 1.142X
-// "BitWriter (8 byte) 15-Bit" 77.28 1X
-// "BitWriter 15-Bit" 88 1.139X
-// "BitWriter (8 byte) 16-Bit" 86.98 1X
-// "BitWriter 16-Bit" 88.08 1.013X
-
-// decode: Function Rate (iters/ms) Comparison
-// ----------------------------------------------------------------------
-// "BitWriter (8 byte) 1-Bit" 132.9 1X
-// "BitWriter 1-Bit" 126.9 0.9546X
-// "BitWriter (8 byte) 2-Bit" 132.9 1X
-// "BitWriter 2-Bit" 125.6 0.9448X
-// "BitWriter (8 byte) 3-Bit" 132.8 1X
-// "BitWriter 3-Bit" 122.7 0.9237X
-// "BitWriter (8 byte) 4-Bit" 133.1 1X
-// "BitWriter 4-Bit" 123.6 0.9284X
-// "BitWriter (8 byte) 5-Bit" 132.2 1X
-// "BitWriter 5-Bit" 118.2 0.8942X
-// "BitWriter (8 byte) 6-Bit" 132.9 1X
-// "BitWriter 6-Bit" 117.6 0.885X
-// "BitWriter (8 byte) 7-Bit" 132.3 1X
-// "BitWriter 7-Bit" 112.8 0.8525X
-// "BitWriter (8 byte) 8-Bit" 132.9 1X
-// "BitWriter 8-Bit" 119.2 0.8971X
-// "BitWriter (8 byte) 9-Bit" 131.8 1X
-// "BitWriter 9-Bit" 111.3 0.8447X
-// "BitWriter (8 byte) 10-Bit" 131.4 1X
-// "BitWriter 10-Bit" 108.5 0.8255X
-// "BitWriter (8 byte) 11-Bit" 131.7 1X
-// "BitWriter 11-Bit" 106.9 0.8118X
-// "BitWriter (8 byte) 12-Bit" 132.9 1X
-// "BitWriter 12-Bit" 108.8 0.8189X
-// "BitWriter (8 byte) 13-Bit" 131 1X
-// "BitWriter 13-Bit" 103.1 0.7873X
-// "BitWriter (8 byte) 14-Bit" 131.6 1X
-// "BitWriter 14-Bit" 101.6 0.7724X
-// "BitWriter (8 byte) 15-Bit" 131.1 1X
-// "BitWriter 15-Bit" 99.91 0.7622X
-// "BitWriter (8 byte) 16-Bit" 133 1X
-// "BitWriter 16-Bit" 105.2 0.7907X
-
-using namespace impala;
-
-const int BUFFER_LEN = 64 * 4096;
-
-struct TestData {
- uint8_t* array;
- uint8_t* buffer;
- int num_values;
- int num_bits;
- int max_value;
- MemPool* pool;
- bool result;
-};
-
-void TestBitWriterEncode(int batch_size, void* d) {
- TestData* data = reinterpret_cast<TestData*>(d);
- int buffer_size = BitUtil::Ceil(data->num_bits * data->num_values, 8);
- for (int i = 0; i < batch_size; ++i) {
- BitWriter writer(data->buffer, buffer_size);
- // Unroll this to focus more on Put performance.
- for (int j = 0; j < data->num_values; j += 8) {
- writer.PutValue(j + 0, data->num_bits);
- writer.PutValue(j + 1, data->num_bits);
- writer.PutValue(j + 2, data->num_bits);
- writer.PutValue(j + 3, data->num_bits);
- writer.PutValue(j + 4, data->num_bits);
- writer.PutValue(j + 5, data->num_bits);
- writer.PutValue(j + 6, data->num_bits);
- writer.PutValue(j + 7, data->num_bits);
- }
- writer.Flush();
- }
-}
-
-void TestBitWriter8ByteEncode(int batch_size, void* d) {
- TestData* data = reinterpret_cast<TestData*>(d);
- int buffer_size = BitUtil::Ceil(data->num_bits * data->num_values, 8);
- for (int i = 0; i < batch_size; ++i) {
- BitWriter_8byte writer(data->buffer, buffer_size);
- // Unroll this to focus more on Put performance.
- for (int j = 0; j < data->num_values; j += 8) {
- writer.PutValue(j + 0, data->num_bits);
- writer.PutValue(j + 1, data->num_bits);
- writer.PutValue(j + 2, data->num_bits);
- writer.PutValue(j + 3, data->num_bits);
- writer.PutValue(j + 4, data->num_bits);
- writer.PutValue(j + 5, data->num_bits);
- writer.PutValue(j + 6, data->num_bits);
- writer.PutValue(j + 7, data->num_bits);
- }
- }
-}
-
-void TestBitWriterDecode(int batch_size, void* d) {
- TestData* data = reinterpret_cast<TestData*>(d);
- int64_t v;
- for (int i = 0; i < batch_size; ++i) {
- BitReader reader(data->buffer, BUFFER_LEN);
- // Unroll this to focus more on Put performance.
- for (int j = 0; j < data->num_values; j += 8) {
- reader.GetValue(data->num_bits, &v);
- reader.GetValue(data->num_bits, &v);
- reader.GetValue(data->num_bits, &v);
- reader.GetValue(data->num_bits, &v);
- reader.GetValue(data->num_bits, &v);
- reader.GetValue(data->num_bits, &v);
- reader.GetValue(data->num_bits, &v);
- reader.GetValue(data->num_bits, &v);
- }
- }
-}
-
-void TestBitWriter8ByteDecode(int batch_size, void* d) {
- TestData* data = reinterpret_cast<TestData*>(d);
- data->result = true;
- int64_t v;
- for (int i = 0; i < batch_size; ++i) {
- BitReader_8byte reader(data->buffer, BUFFER_LEN);
- // Unroll this to focus more on Put performance.
- for (int j = 0; j < data->num_values; j += 8) {
- data->result &= reader.GetValue(data->num_bits, &v);
- data->result &= reader.GetValue(data->num_bits, &v);
- data->result &= reader.GetValue(data->num_bits, &v);
- data->result &= reader.GetValue(data->num_bits, &v);
- data->result &= reader.GetValue(data->num_bits, &v);
- data->result &= reader.GetValue(data->num_bits, &v);
- data->result &= reader.GetValue(data->num_bits, &v);
- data->result &= reader.GetValue(data->num_bits, &v);
- }
- }
- CHECK(data->result);
-}
-
-int main(int argc, char** argv) {
- CpuInfo::Init();
-
- MemTracker tracker;
- MemPool pool(&tracker);
-
- int num_values = 4096;
- int max_bits = 16;
-
- Benchmark encode_suite("encode");
- TestData data[max_bits];
- for (int i = 0; i < max_bits; ++i) {
- data[i].buffer = new uint8_t[BUFFER_LEN];
- data[i].num_values = num_values;
- data[i].num_bits = i + 1;
- data[i].max_value = 1 << i;
- data[i].pool = &pool;
-
- stringstream suffix;
- suffix << " " << (i+1) << "-Bit";
-
- stringstream name;
- name << "\"BitWriter (8 byte)" << suffix.str() << "\"";
- int baseline =
- encode_suite.AddBenchmark(name.str(), TestBitWriter8ByteEncode, &data[i], -1);
-
- name.str("");
- name << "\"BitWriter" << suffix.str() << "\"";
- encode_suite.AddBenchmark(name.str(), TestBitWriterEncode, &data[i], baseline);
- }
- cout << encode_suite.Measure() << endl;
-
- Benchmark decode_suite("decode");
- for (int i = 0; i < max_bits; ++i) {
- stringstream suffix;
- suffix << " " << (i+1) << "-Bit";
-
- stringstream name;
- name << "\"BitWriter (8 byte)" << suffix.str() << "\"";
- int baseline =
- decode_suite.AddBenchmark(name.str(), TestBitWriter8ByteDecode, &data[i], -1);
-
- name.str("");
- name << "\"BitWriter" << suffix.str() << "\"";
- decode_suite.AddBenchmark(name.str(), TestBitWriterDecode, &data[i], baseline);
- }
- cout << decode_suite.Measure() << endl;
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 04127e3..5a2e90e 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -77,6 +77,8 @@ Status ParquetLevelDecoder::Init(const string& filename,
parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
int max_level, int num_buffered_values, uint8_t** data, int* data_size) {
DCHECK_GE(num_buffered_values, 0);
+ DCHECK_GT(cache_size, 0);
+ cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
encoding_ = encoding;
max_level_ = max_level;
num_buffered_values_ = num_buffered_values;
@@ -97,7 +99,7 @@ Status ParquetLevelDecoder::Init(const string& filename,
return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, num_bytes);
}
int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
- Reset(*data, num_bytes, bit_width);
+ rle_decoder_.Reset(*data, num_bytes, bit_width);
break;
}
case parquet::Encoding::BIT_PACKED:
@@ -143,66 +145,80 @@ Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
}
inline int16_t ParquetLevelDecoder::ReadLevel() {
- bool valid;
- uint8_t level;
- if (encoding_ == parquet::Encoding::RLE) {
- valid = Get(&level);
- } else {
- DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
- valid = bit_reader_.GetValue(1, &level);
+ if (UNLIKELY(!CacheHasNext())) {
+ if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) {
+ return HdfsParquetScanner::INVALID_LEVEL;
+ }
+ DCHECK_GE(num_cached_levels_, 0);
+ if (UNLIKELY(num_cached_levels_ == 0)) {
+ return HdfsParquetScanner::INVALID_LEVEL;
+ }
}
- return LIKELY(valid) ? level : HdfsParquetScanner::INVALID_LEVEL;
+ return CacheGetNext();
}
-Status ParquetLevelDecoder::CacheNextBatch(int batch_size) {
- DCHECK_LE(batch_size, cache_size_);
- cached_level_idx_ = 0;
+Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
+ /// Fill the cache completely if there are enough values remaining.
+ /// Otherwise don't try to read more values than are left.
+ int batch_size = min(vals_remaining, cache_size_);
if (max_level_ > 0) {
- if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_))) {
+ if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_) ||
+ num_cached_levels_ < batch_size)) {
return Status(decoding_error_code_, num_buffered_values_, filename_);
}
} else {
// No levels to read, e.g., because the field is required. The cache was
// already initialized with all zeros, so we can hand out those values.
DCHECK_EQ(max_level_, 0);
+ cached_level_idx_ = 0;
num_cached_levels_ = batch_size;
}
return Status::OK();
}
-bool ParquetLevelDecoder::FillCache(int batch_size,
- int* num_cached_levels) {
- DCHECK(num_cached_levels != NULL);
- int num_values = 0;
+bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
+ DCHECK(!CacheHasNext());
+ DCHECK(num_cached_levels != nullptr);
+ DCHECK_GE(max_level_, 0);
+ DCHECK_EQ(num_cached_levels_ % 32, 0) << "Last batch was not a multiple of 32";
+ cached_level_idx_ = 0;
+ if (max_level_ == 0) {
+ // No levels to read, e.g., because the field is required. The cache was
+ // already initialized with all zeros, so we can hand out those values.
+ *num_cached_levels = batch_size;
+ return true;
+ }
if (encoding_ == parquet::Encoding::RLE) {
- while (true) {
- // Add RLE encoded values by repeating the current value this number of times.
- uint32_t num_repeats_to_set =
- min<uint32_t>(repeat_count_, batch_size - num_values);
- memset(cached_levels_ + num_values, current_value_, num_repeats_to_set);
- num_values += num_repeats_to_set;
- repeat_count_ -= num_repeats_to_set;
-
- // Add remaining literal values, if any.
- uint32_t num_literals_to_set =
- min<uint32_t>(literal_count_, batch_size - num_values);
- int num_values_end = min<uint32_t>(num_values + literal_count_, batch_size);
- for (; num_values < num_values_end; ++num_values) {
- bool valid = bit_reader_.GetValue(bit_width_, &cached_levels_[num_values]);
- if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false;
- }
- literal_count_ -= num_literals_to_set;
-
- if (num_values == batch_size) break;
- if (UNLIKELY(!NextCounts<int16_t>())) return false;
- if (repeat_count_ > 0 && current_value_ > max_level_) return false;
- }
+ return FillCacheRle(batch_size, num_cached_levels);
} else {
DCHECK_EQ(encoding_, parquet::Encoding::BIT_PACKED);
- for (; num_values < batch_size; ++num_values) {
- bool valid = bit_reader_.GetValue(1, &cached_levels_[num_values]);
- if (UNLIKELY(!valid || cached_levels_[num_values] > max_level_)) return false;
+ *num_cached_levels = bit_reader_.UnpackBatch(1, batch_size, cached_levels_);
+ return true;
+ }
+}
+
+bool ParquetLevelDecoder::FillCacheRle(int batch_size, int* num_cached_levels) {
+ int num_values = 0;
+ while (num_values < batch_size) {
+ // Add RLE encoded values by repeating the current value this number of times.
+ uint32_t num_repeats = rle_decoder_.NextNumRepeats();
+ if (num_repeats > 0) {
+ uint32_t num_repeats_to_set = min<uint32_t>(num_repeats, batch_size - num_values);
+ uint8_t repeated_value = rle_decoder_.GetRepeatedValue(num_repeats_to_set);
+ memset(cached_levels_ + num_values, repeated_value, num_repeats_to_set);
+ num_values += num_repeats_to_set;
+ continue;
+ }
+
+ // Add remaining literal values, if any.
+ uint32_t num_literals = rle_decoder_.NextNumLiterals();
+ if (num_literals == 0) break;
+ uint32_t num_literals_to_set = min<uint32_t>(num_literals, batch_size - num_values);
+ if (!rle_decoder_.GetLiteralValues(
+ num_literals_to_set, &cached_levels_[num_values])) {
+ return false;
}
+ num_values += num_literals_to_set;
}
*num_cached_levels = num_values;
return true;
@@ -282,9 +298,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
// NextLevels() should have already been called and def and rep levels should be in
// valid range.
DCHECK_GE(rep_level_, 0);
- DCHECK_LE(rep_level_, max_rep_level());
DCHECK_GE(def_level_, 0);
- DCHECK_LE(def_level_, max_def_level());
DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
"Caller should have called NextLevels() until we are ready to read a value";
@@ -330,6 +344,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
int val_count = 0;
bool continue_execution = true;
while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+ DCHECK_GE(num_buffered_values_, 0);
// Read next page if necessary.
if (num_buffered_values_ == 0) {
if (!NextPage()) {
@@ -338,26 +353,29 @@ class ScalarColumnReader : public BaseScalarColumnReader {
}
}
- // Fill def/rep level caches if they are empty.
- int level_batch_size = min(parent_->state_->batch_size(), num_buffered_values_);
- if (!def_levels_.CacheHasNext()) {
- parent_->parse_status_.MergeStatus(def_levels_.CacheNextBatch(level_batch_size));
- }
- // We only need the repetition levels for populating the position slot since we
- // are only populating top-level tuples.
- if (IN_COLLECTION && pos_slot_desc_ != NULL && !rep_levels_.CacheHasNext()) {
- parent_->parse_status_.MergeStatus(rep_levels_.CacheNextBatch(level_batch_size));
- }
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
-
- // This special case is most efficiently handled here directly.
+ // Not materializing anything - skip decoding any levels and rely on the value
+ // count from page metadata to return the correct number of rows.
if (!MATERIALIZED && !IN_COLLECTION) {
- int vals_to_add = min(def_levels_.CacheRemaining(), max_values - val_count);
+ int vals_to_add = min(num_buffered_values_, max_values - val_count);
val_count += vals_to_add;
- def_levels_.CacheSkipLevels(vals_to_add);
num_buffered_values_ -= vals_to_add;
continue;
}
+ // Fill the rep level cache if needed. We are flattening out the fields of the
+ // nested collection into the top-level tuple returned by the scan, so we don't
+ // care about the nesting structure unless the position slot is being populated.
+ if (IN_COLLECTION && pos_slot_desc_ != nullptr && !rep_levels_.CacheHasNext()) {
+ parent_->parse_status_.MergeStatus(
+ rep_levels_.CacheNextBatch(num_buffered_values_));
+ if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+ }
+
+ // Fill def level cache if needed.
+ if (!def_levels_.CacheHasNext()) {
+ // TODO: add a fast path here if there's a run of repeated values.
+ parent_->parse_status_.MergeStatus(
+ def_levels_.CacheNextBatch(num_buffered_values_));
+ }
// Read data page and cached levels to materialize values.
int cache_start_idx = def_levels_.CacheCurrIdx();
@@ -685,7 +703,9 @@ class BoolColumnReader : public BaseScalarColumnReader {
virtual Status InitDataPage(uint8_t* data, int size) {
// Initialize bool decoder
- bool_values_ = BitReader(data, size);
+ bool_values_.Reset(data, size);
+ num_unpacked_values_ = 0;
+ unpacked_value_idx_ = 0;
return Status::OK();
}
@@ -695,9 +715,7 @@ class BoolColumnReader : public BaseScalarColumnReader {
DCHECK(slot_desc_ != NULL);
// Def and rep levels should be in valid range.
DCHECK_GE(rep_level_, 0);
- DCHECK_LE(rep_level_, max_rep_level());
DCHECK_GE(def_level_, 0);
- DCHECK_LE(def_level_, max_def_level());
DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
"Caller should have called NextLevels() until we are ready to read a value";
@@ -719,14 +737,38 @@ class BoolColumnReader : public BaseScalarColumnReader {
template<bool IN_COLLECTION>
inline bool ReadSlot(Tuple* tuple, MemPool* pool) {
void* slot = tuple->GetSlot(tuple_offset_);
- if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) {
- parent_->parse_status_ = Status("Invalid bool column.");
- return false;
+ bool val;
+ if (unpacked_value_idx_ < num_unpacked_values_) {
+ val = unpacked_values_[unpacked_value_idx_++];
+ } else {
+ // Unpack as many values as we can into the buffer. We expect to read at least one
+ // value.
+ int num_unpacked =
+ bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+ if (UNLIKELY(num_unpacked == 0)) {
+ parent_->parse_status_ = Status("Invalid bool column.");
+ return false;
+ }
+ val = unpacked_values_[0];
+ num_unpacked_values_ = num_unpacked;
+ unpacked_value_idx_ = 1;
}
+ *reinterpret_cast<bool*>(slot) = val;
return NextLevels<IN_COLLECTION>();
}
- BitReader bool_values_;
+ /// A buffer to store unpacked values. Must be a multiple of 32 size to use the
+ /// batch-oriented interface of BatchedBitReader.
+ static const int UNPACKED_BUFFER_LEN = 128;
+ bool unpacked_values_[UNPACKED_BUFFER_LEN];
+
+ /// The number of valid values in 'unpacked_values_'.
+ int num_unpacked_values_ = 0;
+
+ /// The next value to return from 'unpacked_values_'.
+ int unpacked_value_idx_ = 0;
+
+ BatchedBitReader bool_values_;
};
// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index dea84a8..17b8c0f 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -22,6 +22,7 @@
#include "exec/hdfs-parquet-scanner.h"
#include "util/codec.h"
+#include "util/bit-stream-utils.h"
#include "util/dict-encoding.h"
#include "util/rle-encoding.h"
@@ -36,39 +37,34 @@ class MemPool;
/// depth of 100, as enforced by the FE. Using a small type saves memory and speeds up
/// populating the level cache (e.g., with RLE we can memset() repeated values).
///
-/// Inherits from RleDecoder instead of containing one for performance reasons.
-/// The containment design would require two BitReaders per column reader. The extra
-/// BitReader causes enough bloat for a column reader to require another cache line.
-/// TODO: It is not clear whether the inheritance vs. containment choice still makes
-/// sense with column-wise materialization. The containment design seems cleaner and
-/// we should revisit.
-class ParquetLevelDecoder : public RleDecoder {
+/// TODO: expose whether we're in a run of repeated values so that callers can
+/// optimise for that case.
+class ParquetLevelDecoder {
public:
ParquetLevelDecoder(bool is_def_level_decoder)
- : cached_levels_(NULL),
- num_cached_levels_(0),
- cached_level_idx_(0),
- encoding_(parquet::Encoding::PLAIN),
- max_level_(0),
- cache_size_(0),
- num_buffered_values_(0),
- decoding_error_code_(is_def_level_decoder ?
+ : decoding_error_code_(is_def_level_decoder ?
TErrorCode::PARQUET_DEF_LEVEL_ERROR : TErrorCode::PARQUET_REP_LEVEL_ERROR) {
}
/// Initialize the LevelDecoder. Reads and advances the provided data buffer if the
- /// encoding requires reading metadata from the page header.
+ /// encoding requires reading metadata from the page header. 'cache_size' will be
+ /// rounded up to a multiple of 32 internally.
Status Init(const string& filename, parquet::Encoding::type encoding,
MemPool* cache_pool, int cache_size, int max_level, int num_buffered_values,
uint8_t** data, int* data_size);
- /// Returns the next level or INVALID_LEVEL if there was an error.
+ /// Returns the next level or INVALID_LEVEL if there was an error. Not as efficient
+ /// as batched methods.
inline int16_t ReadLevel();
- /// Decodes and caches the next batch of levels. Resets members associated with the
- /// cache. Returns a non-ok status if there was a problem decoding a level, or if a
- /// level was encountered with a value greater than max_level_.
- Status CacheNextBatch(int batch_size);
+ /// Decodes and caches the next batch of levels given that there are 'vals_remaining'
+ /// values left to decode in the page. Resets members associated with the cache.
+ /// Returns a non-ok status if there was a problem decoding a level, if a level was
+ /// encountered with a value greater than max_level_, or if fewer than
+ /// min(CacheSize(), vals_remaining) levels could be read, which indicates that the
+ /// input did not have the expected number of values. Only valid to call when
+ /// the cache has been exhausted, i.e. CacheHasNext() is false.
+ Status CacheNextBatch(int vals_remaining);
/// Functions for working with the level cache.
inline bool CacheHasNext() const { return cached_level_idx_ < num_cached_levels_; }
@@ -83,33 +79,57 @@ class ParquetLevelDecoder : public RleDecoder {
inline int CacheSize() const { return num_cached_levels_; }
inline int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
inline int CacheCurrIdx() const { return cached_level_idx_; }
-
private:
/// Initializes members associated with the level cache. Allocates memory for
/// the cache from pool, if necessary.
Status InitCache(MemPool* pool, int cache_size);
- /// Decodes and writes a batch of levels into the cache. Sets the number of
- /// values written to the cache in *num_cached_levels. Returns false if there was
- /// an error decoding a level or if there was a level value greater than max_level_.
+ /// Decodes and writes a batch of levels into the cache. Returns true and sets
+ /// the number of values written to the cache via *num_cached_levels if no errors
+ /// are encountered. *num_cached_levels is < 'batch_size' in this case iff the
+ /// end of input was hit without any other errors. Returns false if there was an
+ /// error decoding a level or if there was an invalid level value greater than
+ /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e.
+ /// CacheHasNext() is false.
bool FillCache(int batch_size, int* num_cached_levels);
- /// Buffer for a batch of levels. The memory is allocated and owned by a pool in
- /// passed in Init().
- uint8_t* cached_levels_;
+ /// Implementation of FillCache() for RLE encoding.
+ bool FillCacheRle(int batch_size, int* num_cached_levels);
+
+ /// RLE decoder, used if 'encoding_' is RLE.
+ RleBatchDecoder<uint8_t> rle_decoder_;
+
+ /// Bit unpacker, used if 'encoding_' is BIT_PACKED.
+ BatchedBitReader bit_reader_;
+
+ /// Buffer for a batch of levels. The memory is allocated and owned by a pool passed
+ /// in Init().
+ uint8_t* cached_levels_ = nullptr;
+
/// Number of valid level values in the cache.
- int num_cached_levels_;
+ int num_cached_levels_ = 0;
+
/// Current index into cached_levels_.
- int cached_level_idx_;
- parquet::Encoding::type encoding_;
+ int cached_level_idx_ = 0;
+
+ /// The parquet encoding used for the levels. Usually RLE but the deprecated BIT_PACKED
+ /// encoding is also allowed.
+ parquet::Encoding::type encoding_ = parquet::Encoding::PLAIN;
/// For error checking and reporting.
- int max_level_;
- /// Number of level values cached_levels_ has memory allocated for.
- int cache_size_;
+ int max_level_ = 0;
+
+ /// Number of level values cached_levels_ has memory allocated for. Always
+ /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
+ int cache_size_ = 0;
+
/// Number of remaining data values in the current data page.
- int num_buffered_values_;
+ int num_buffered_values_ = 0;
+
+ /// Name of the parquet file. Used for reporting level decoding errors.
string filename_;
+
+ /// Error code to use when reporting level decoding errors.
TErrorCode::type decoding_error_code_;
};
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/experiments/bit-stream-utils.8byte.h
----------------------------------------------------------------------
diff --git a/be/src/experiments/bit-stream-utils.8byte.h b/be/src/experiments/bit-stream-utils.8byte.h
deleted file mode 100644
index f9418a5..0000000
--- a/be/src/experiments/bit-stream-utils.8byte.h
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_H
-#define IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_H
-
-#include <boost/cstdint.hpp>
-#include <string.h>
-#include "common/compiler-util.h"
-#include "common/logging.h"
-#include "util/bit-util.h"
-
-namespace impala {
-
-/// Utility class to write bit/byte streams. This class can write data to either be
-/// bit packed or byte aligned (and a single stream that has a mix of both).
-/// This class does not allocate memory.
-class BitWriter_8byte {
- public:
- /// buffer: buffer to write bits to. Buffer should be preallocated with
- /// 'buffer_len' bytes. 'buffer_len' must be a multiple of 8.
- BitWriter_8byte(uint8_t* buffer, int buffer_len) :
- buffer_(reinterpret_cast<uint64_t*>(buffer)),
- max_bytes_(buffer_len),
- offset_(0),
- bit_offset_(0) {
- DCHECK_EQ(buffer_len % 8, 0);
- }
-
- void Clear() {
- offset_ = 0;
- bit_offset_ = 0;
- memset(buffer_, 0, max_bytes_);
- }
-
- uint8_t* buffer() const { return reinterpret_cast<uint8_t*>(buffer_); }
- int buffer_len() const { return max_bytes_; }
-
- inline int bytes_written() const {
- return offset_ * 8 + BitUtil::Ceil(bit_offset_, 8);
- }
-
- /// Writes a value to the buffer. This is bit packed. Returns false if
- /// there was not enough space.
- bool PutValue(uint64_t v, int num_bits);
-
- /// Writes v to the next aligned byte.
- template<typename T>
- bool PutAligned(T v, int num_bits);
-
- /// Write a Vlq encoded int to the buffer. Returns false if there was not enough
- /// room. The value is written byte aligned.
- /// For more details on vlq:
- /// en.wikipedia.org/wiki/Variable-length_quantity
- bool PutVlqInt(int32_t v);
-
- /// Get a pointer to the next aligned byte and advance the underlying buffer
- /// by num_bytes.
- /// Returns NULL if there was not enough space.
- uint8_t* GetNextBytePtr(int num_bytes = 1);
-
- private:
- uint64_t* buffer_;
- int max_bytes_;
- int offset_; // Offset into buffer_
- int bit_offset_; // Offset into current uint64_t
-};
-
-/// Utility class to read bit/byte stream. This class can read bits or bytes
-/// that are either byte aligned or not. It also has utilities to read multiple
-/// bytes in one read (e.g. encoded int).
-class BitReader_8byte {
- public:
- /// buffer: buffer to read from. The buffer's length is 'buffer_len' and must be a
- /// multiple of 8.
- BitReader_8byte(uint8_t* buffer, int buffer_len) :
- buffer_(reinterpret_cast<uint64_t*>(buffer)),
- max_bytes_(buffer_len),
- offset_(0),
- bit_offset_(0) {
- DCHECK_EQ(buffer_len % 8, 0);
- }
-
- BitReader_8byte() : buffer_(NULL), max_bytes_(0) {}
-
- /// Gets the next value from the buffer.
- /// Returns true if 'v' could be read or false if there are not enough bytes left.
- template<typename T>
- bool GetValue(int num_bits, T* v);
-
- /// Reads a T sized value from the buffer. T needs to be a native type and little
- /// endian. The value is assumed to be byte aligned so the stream will be advance
- /// to the start of the next byte before v is read.
- template<typename T>
- bool GetAligned(int num_bits, T* v);
-
- /// Reads a vlq encoded int from the stream. The encoded int must start at the
- /// beginning of a byte. Return false if there were not enough bytes in the buffer.
- bool GetVlqInt(int32_t* v);
-
- /// Returns the number of bytes left in the stream, not including the current byte (i.e.,
- /// there may be an additional fraction of a byte).
- inline int bytes_left() {
- return max_bytes_ - (offset_ * 8 + BitUtil::Ceil(bit_offset_, 8));
- }
-
- /// Maximum byte length of a vlq encoded int
- static const int MAX_VLQ_BYTE_LEN = 5;
-
- private:
- uint64_t* buffer_;
- int max_bytes_;
- int offset_; // Offset into buffer_
- int bit_offset_; // Offset into current uint64_t
-
- /// Advances offset_ and/or bit_offset_ to next byte boundary in buffer_.
- inline void Align();
-};
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/experiments/bit-stream-utils.8byte.inline.h
----------------------------------------------------------------------
diff --git a/be/src/experiments/bit-stream-utils.8byte.inline.h b/be/src/experiments/bit-stream-utils.8byte.inline.h
deleted file mode 100644
index 1b1c05a..0000000
--- a/be/src/experiments/bit-stream-utils.8byte.inline.h
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_INLINE_H
-#define IMPALA_EXPERIMENTS_BIT_STREAM_UTILS_8BYTE_INLINE_H
-
-#include "experiments/bit-stream-utils.8byte.h"
-
-namespace impala {
-
-inline bool BitWriter_8byte::PutValue(uint64_t v, int num_bits) {
- DCHECK_LE(num_bits, 64);
- DCHECK(num_bits == 64 || v >> num_bits == 0)
- << "v = " << v << ", num_bits = " << num_bits;
-
- if (UNLIKELY(offset_ * 8 + bit_offset_ > max_bytes_ * 8 - num_bits)) return false;
-
- buffer_[offset_] |= v << bit_offset_;
- bit_offset_ += num_bits;
- if (bit_offset_ >= 64) {
- ++offset_;
- bit_offset_ -= 64;
- if (UNLIKELY(bit_offset_ > 0)) {
- // Write out bits of v that crossed into new byte offset
- // We cannot perform v >> num_bits (i.e. when bit_offset_ is 0) because v >> 64 != 0
- buffer_[offset_] = v >> (num_bits - bit_offset_);
- }
- }
- DCHECK_LT(bit_offset_, 64);
- return true;
-}
-
-inline uint8_t* BitWriter_8byte::GetNextBytePtr(int num_bytes) {
- if (UNLIKELY(bytes_written() + num_bytes > max_bytes_)) return NULL;
-
- // Advance to next aligned byte if necessary
- if (UNLIKELY(bit_offset_ > 56)) {
- ++offset_;
- bit_offset_ = 0;
- } else {
- bit_offset_ = BitUtil::RoundUpNumBytes(bit_offset_) * 8;
- }
-
- DCHECK_EQ(bit_offset_ % 8, 0);
- uint8_t* ptr = reinterpret_cast<uint8_t*>(buffer_ + offset_) + bit_offset_ / 8;
- bit_offset_ += num_bytes * 8;
- offset_ += bit_offset_ / 64;
- bit_offset_ %= 64;
- return ptr;
-}
-
-template<typename T>
-inline bool BitWriter_8byte::PutAligned(T val, int num_bits) {
- // Align to byte boundary
- uint8_t* byte_ptr = GetNextBytePtr(0);
- bool result = PutValue(val, num_bits);
- if (!result) return false;
- // Pad to next byte boundary
- byte_ptr = GetNextBytePtr(0);
- DCHECK(byte_ptr != NULL);
- return true;
-}
-
-inline bool BitWriter_8byte::PutVlqInt(int32_t v) {
- bool result = true;
- while ((v & 0xFFFFFF80) != 0L) {
- result &= PutAligned<uint8_t>((v & 0x7F) | 0x80, 8);
- v >>= 7;
- }
- result &= PutAligned<uint8_t>(v & 0x7F, 8);
- return result;
-}
-
-template<typename T>
-inline bool BitReader_8byte::GetValue(int num_bits, T* v) {
- int size = sizeof(T) * 8;
- DCHECK_LE(num_bits, size);
- if (UNLIKELY(offset_ * 8 + bit_offset_ > max_bytes_ * 8 - num_bits)) return false;
-
- *v = BitUtil::TrailingBits(buffer_[offset_], bit_offset_ + num_bits) >> bit_offset_;
- bit_offset_ += num_bits;
- if (bit_offset_ >= 64) {
- ++offset_;
- bit_offset_ -= 64;
- // Read bits of v that crossed into new byte offset
- *v |= BitUtil::TrailingBits(buffer_[offset_], bit_offset_)
- << (num_bits - bit_offset_);
- }
- DCHECK_LT(bit_offset_, 64);
- return true;
-}
-
-template<typename T>
-inline bool BitReader_8byte::GetAligned(int num_bits, T* v) {
- Align();
- if (UNLIKELY(bytes_left() < BitUtil::Ceil(num_bits, 8))) return false;
- DCHECK_EQ(bit_offset_ % 8, 0);
- bool result = GetValue(num_bits, v);
- DCHECK(result);
- Align();
- return true;
-}
-
-inline bool BitReader_8byte::GetVlqInt(int32_t* v) {
- *v = 0;
- int shift = 0;
- int num_bytes = 0;
- uint8_t byte = 0;
- do {
- if (!GetAligned<uint8_t>(8, &byte)) return false;
- *v |= (byte & 0x7F) << shift;
- shift += 7;
- DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
- } while ((byte & 0x80) != 0);
- return true;
-}
-
-inline void BitReader_8byte::Align() {
- if (UNLIKELY(bit_offset_ > 56)) {
- ++offset_;
- bit_offset_ = 0;
- DCHECK_LE(offset_, max_bytes_);
- } else {
- bit_offset_ = BitUtil::RoundUpNumBytes(bit_offset_) * 8;
- }
-}
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-packing.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.h b/be/src/util/bit-packing.h
index 62e5e88..05036db 100644
--- a/be/src/util/bit-packing.h
+++ b/be/src/util/bit-packing.h
@@ -62,6 +62,27 @@ class BitPacking {
const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
OutType* __restrict__ out);
+ /// Same as above, templated by BIT_WIDTH.
+ template <typename OutType, int BIT_WIDTH>
+ static std::pair<const uint8_t*, int64_t> UnpackValues(const uint8_t* __restrict__ in,
+ int64_t in_bytes, int64_t num_values, OutType* __restrict__ out);
+
+ /// Unpack values as above, treating them as unsigned integers, and decode them
+ /// using the provided dict. Sets 'decode_error' to true if one of the packed
+ /// values was greater than 'dict_len'. Does not modify 'decode_error' on success.
+ template <typename OutType>
+ static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(int bit_width,
+ const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+ int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+ bool* __restrict__ decode_error);
+
+ /// Same as above, templated by BIT_WIDTH.
+ template <typename OutType, int BIT_WIDTH>
+ static std::pair<const uint8_t*, int64_t> UnpackAndDecodeValues(
+ const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+ int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+ bool* __restrict__ decode_error);
+
/// Unpack exactly 32 values of 'bit_width' from 'in' to 'out'. 'in' must point to
/// 'in_bytes' of addressable memory, and 'in_bytes' must be at least
/// (32 * bit_width / 8). 'out' must have space for 32 OutType values.
@@ -70,22 +91,42 @@ class BitPacking {
static const uint8_t* Unpack32Values(int bit_width, const uint8_t* __restrict__ in,
int64_t in_bytes, OutType* __restrict__ out);
- private:
- /// Implementation of Unpack32Values() that uses 32-bit integer loads to
- /// unpack values with the given BIT_WIDTH from 'in' to 'out'.
+ /// Same as Unpack32Values() but templated by BIT_WIDTH.
template <typename OutType, int BIT_WIDTH>
static const uint8_t* Unpack32Values(
const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ out);
- /// Function that unpacks 'num_values' values with the given BIT_WIDTH from 'in' to
- /// 'out'. 'num_values' can be at most 32. The version with 'bit_width' as an argument
- /// dispatches based on 'bit_width' to the appropriate templated implementation.
- template <typename OutType, int BIT_WIDTH>
- static const uint8_t* UnpackUpTo32Values(const uint8_t* __restrict__ in,
- int64_t in_bytes, int num_values, OutType* __restrict__ out);
+ /// Same as Unpack32Values() with dictionary decoding.
template <typename OutType>
- static const uint8_t* UnpackUpTo32Values(int bit_width, const uint8_t* __restrict__ in,
+ static const uint8_t* UnpackAndDecode32Values(int bit_width,
+ const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+ int64_t dict_len, OutType* __restrict__ out, bool* __restrict__ decode_error);
+
+ /// Same as UnpackAndDecode32Values() but templated by BIT_WIDTH.
+ template <typename OutType, int BIT_WIDTH>
+ static const uint8_t* UnpackAndDecode32Values(const uint8_t* __restrict__ in,
+ int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
+ OutType* __restrict__ out, bool* __restrict__ decode_error);
+
+ /// Unpacks 'num_values' values with the given BIT_WIDTH from 'in' to 'out'.
+ /// 'num_values' must be at most 31. 'in' must point to 'in_bytes' of addressable
+ /// memory, and 'in_bytes' must be at least ceil(num_values * bit_width / 8).
+ /// 'out' must have space for 'num_values' OutType values.
+ /// 0 <= 'bit_width' <= 32 and 'bit_width' <= # of bits in OutType.
+ template <typename OutType, int BIT_WIDTH>
+ static const uint8_t* UnpackUpTo31Values(const uint8_t* __restrict__ in,
int64_t in_bytes, int num_values, OutType* __restrict__ out);
+
+ /// Same as UnpackUpTo31Values() with dictionary decoding.
+ template <typename OutType, int BIT_WIDTH>
+ static const uint8_t* UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in,
+ int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values,
+ OutType* __restrict__ out, bool* __restrict__ decode_error);
+
+ private:
+ /// Compute the number of values with the given bit width that can be unpacked from
+ /// an input buffer of 'in_bytes' into an output buffer with space for 'num_values'.
+ static int64_t NumValuesToUnpack(int bit_width, int64_t in_bytes, int64_t num_values);
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-packing.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-packing.inline.h b/be/src/util/bit-packing.inline.h
index 37d51ab..4b2bb33 100644
--- a/be/src/util/bit-packing.inline.h
+++ b/be/src/util/bit-packing.inline.h
@@ -31,32 +31,109 @@
namespace impala {
+inline int64_t BitPacking::NumValuesToUnpack(
+ int bit_width, int64_t in_bytes, int64_t num_values) {
+ // Check if we have enough input bytes to decode 'num_values'.
+ if (bit_width == 0 || BitUtil::RoundUpNumBytes(num_values * bit_width) <= in_bytes) {
+ // Limited by output space.
+ return num_values;
+ } else {
+ // Limited by the number of input bytes. Compute the number of values that can be
+ // unpacked from the input.
+ return (in_bytes * CHAR_BIT) / bit_width;
+ }
+}
+
template <typename OutType>
std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(int bit_width,
const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
OutType* __restrict__ out) {
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+ case i: \
+ return UnpackValues<OutType, i>(in, in_bytes, num_values, out);
+
+ switch (bit_width) {
+ // Expand cases from 0 to 32.
+ BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
+ default:
+ DCHECK(false);
+ return std::make_pair(nullptr, -1);
+ }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackValues(
+ const uint8_t* __restrict__ in, int64_t in_bytes, int64_t num_values,
+ OutType* __restrict__ out) {
constexpr int BATCH_SIZE = 32;
- const int64_t max_input_values =
- bit_width ? (in_bytes * CHAR_BIT) / bit_width : num_values;
- const int64_t values_to_read = std::min(num_values, max_input_values);
+ const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values);
const int64_t batches_to_read = values_to_read / BATCH_SIZE;
const int64_t remainder_values = values_to_read % BATCH_SIZE;
const uint8_t* in_pos = in;
OutType* out_pos = out;
// First unpack as many full batches as possible.
for (int64_t i = 0; i < batches_to_read; ++i) {
- in_pos = Unpack32Values<OutType>(bit_width, in_pos, in_bytes, out_pos);
+ in_pos = Unpack32Values<OutType, BIT_WIDTH>(in_pos, in_bytes, out_pos);
out_pos += BATCH_SIZE;
- in_bytes -= (BATCH_SIZE * bit_width) / CHAR_BIT;
+ in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
}
// Then unpack the final partial batch.
if (remainder_values > 0) {
- in_pos = UnpackUpTo32Values<OutType>(bit_width,
+ in_pos = UnpackUpTo31Values<OutType, BIT_WIDTH>(
in_pos, in_bytes, remainder_values, out_pos);
}
return std::make_pair(in_pos, values_to_read);
}
+template <typename OutType>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(int bit_width,
+ const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+ int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+ bool* __restrict__ decode_error) {
+#pragma push_macro("UNPACK_VALUES_CASE")
+#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
+ case i: \
+ return UnpackAndDecodeValues<OutType, i>( \
+ in, in_bytes, dict, dict_len, num_values, out, decode_error);
+
+ switch (bit_width) {
+ // Expand cases from 0 to 32.
+ BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
+ default:
+ DCHECK(false);
+ return std::make_pair(nullptr, -1);
+ }
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+std::pair<const uint8_t*, int64_t> BitPacking::UnpackAndDecodeValues(
+ const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ dict,
+ int64_t dict_len, int64_t num_values, OutType* __restrict__ out,
+ bool* __restrict__ decode_error) {
+ constexpr int BATCH_SIZE = 32;
+ const int64_t values_to_read = NumValuesToUnpack(BIT_WIDTH, in_bytes, num_values);
+ const int64_t batches_to_read = values_to_read / BATCH_SIZE;
+ const int64_t remainder_values = values_to_read % BATCH_SIZE;
+ const uint8_t* in_pos = in;
+ OutType* out_pos = out;
+ // First unpack as many full batches as possible.
+ for (int64_t i = 0; i < batches_to_read; ++i) {
+ in_pos = UnpackAndDecode32Values<OutType, BIT_WIDTH>(
+ in_pos, in_bytes, dict, dict_len, out_pos, decode_error);
+ out_pos += BATCH_SIZE;
+ in_bytes -= (BATCH_SIZE * BIT_WIDTH) / CHAR_BIT;
+ }
+ // Then unpack the final partial batch.
+ if (remainder_values > 0) {
+ in_pos = UnpackAndDecodeUpTo31Values<OutType, BIT_WIDTH>(
+ in_pos, in_bytes, dict, dict_len, remainder_values, out_pos, decode_error);
+ }
+ return std::make_pair(in_pos, values_to_read);
+}
+
// Loop body of unrolled loop that unpacks the value. BIT_WIDTH is the bit width of
// the packed values. 'in_buf' is the start of the input buffer and 'out_vals' is the
// start of the output values array. This function unpacks the VALUE_IDX'th packed value
@@ -111,60 +188,83 @@ inline uint32_t ALWAYS_INLINE UnpackValue(const uint8_t* __restrict__ in_buf) {
}
}
+template <typename OutType>
+inline void ALWAYS_INLINE DecodeValue(OutType* __restrict__ dict, int64_t dict_len,
+ uint32_t idx, OutType* __restrict__ out_val, bool* __restrict__ decode_error) {
+ if (UNLIKELY(idx >= dict_len)) {
+ *decode_error = true;
+ } else {
+ // Use memcpy() because we can't assume sufficient alignment in some cases (e.g.
+ // 16 byte decimals).
+ memcpy(out_val, &dict[idx], sizeof(OutType));
+ }
+}
+
template <typename OutType, int BIT_WIDTH>
const uint8_t* BitPacking::Unpack32Values(
const uint8_t* __restrict__ in, int64_t in_bytes, OutType* __restrict__ out) {
static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
- static_assert(
- BIT_WIDTH <= sizeof(OutType) * CHAR_BIT, "BIT_WIDTH too high for output type");
+ DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output";
constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
DCHECK_GE(in_bytes, BYTES_TO_READ);
-// Call UnpackValue for 0 <= i < 32.
-#pragma push_macro("UNPACK_VALUES_CALL")
+ // Call UnpackValue for 0 <= i < 32.
+#pragma push_macro("UNPACK_VALUE_CALL")
#define UNPACK_VALUE_CALL(ignore1, i, ignore2) \
out[i] = static_cast<OutType>(UnpackValue<BIT_WIDTH, i>(in));
+
BOOST_PP_REPEAT_FROM_TO(0, 32, UNPACK_VALUE_CALL, ignore);
-#pragma pop_macro("UNPACK_VALUES_CALL")
return in + BYTES_TO_READ;
+#pragma pop_macro("UNPACK_VALUE_CALL")
}
template <typename OutType>
const uint8_t* BitPacking::Unpack32Values(int bit_width, const uint8_t* __restrict__ in,
int64_t in_bytes, OutType* __restrict__ out) {
- switch (bit_width) {
- // Expand cases from 0 to 32.
#pragma push_macro("UNPACK_VALUES_CASE")
#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
case i: return Unpack32Values<OutType, i>(in, in_bytes, out);
- BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
- default: DCHECK(false); return in;
- }
-}
-template <typename OutType>
-const uint8_t* BitPacking::UnpackUpTo32Values(int bit_width, const uint8_t* __restrict__ in,
- int64_t in_bytes, int num_values, OutType* __restrict__ out) {
switch (bit_width) {
// Expand cases from 0 to 32.
-#pragma push_macro("UNPACK_VALUES_CASE")
-#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
- case i: return UnpackUpTo32Values<OutType, i>(in, in_bytes, num_values, out);
BOOST_PP_REPEAT_FROM_TO(0, 33, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
default: DCHECK(false); return in;
}
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackAndDecode32Values(const uint8_t* __restrict__ in,
+ int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len,
+ OutType* __restrict__ out, bool* __restrict__ decode_error) {
+ static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+ static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
+ DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output";
+ constexpr int BYTES_TO_READ = BitUtil::RoundUpNumBytes(32 * BIT_WIDTH);
+ DCHECK_GE(in_bytes, BYTES_TO_READ);
+ // TODO: this could be optimised further by using SIMD instructions.
+ // https://lemire.me/blog/2016/08/25/faster-dictionary-decoding-with-simd-instructions/
+
+ // Call UnpackValue() and DecodeValue() for 0 <= i < 32.
+#pragma push_macro("DECODE_VALUE_CALL")
+#define DECODE_VALUE_CALL(ignore1, i, ignore2) \
+ { \
+ uint32_t idx = UnpackValue<BIT_WIDTH, i>(in); \
+ DecodeValue(dict, dict_len, idx, &out[i], decode_error); \
+ }
+
+ BOOST_PP_REPEAT_FROM_TO(0, 32, DECODE_VALUE_CALL, ignore);
+ return in + BYTES_TO_READ;
+#pragma pop_macro("DECODE_VALUE_CALL")
}
template <typename OutType, int BIT_WIDTH>
-const uint8_t* BitPacking::UnpackUpTo32Values(const uint8_t* __restrict__ in,
+const uint8_t* BitPacking::UnpackUpTo31Values(const uint8_t* __restrict__ in,
int64_t in_bytes, int num_values, OutType* __restrict__ out) {
static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
- static_assert(
- BIT_WIDTH <= sizeof(OutType) * CHAR_BIT, "BIT_WIDTH too high for output type");
+ DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output";
constexpr int MAX_BATCH_SIZE = 31;
const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH);
DCHECK_GE(in_bytes, BYTES_TO_READ);
@@ -183,19 +283,65 @@ const uint8_t* BitPacking::UnpackUpTo32Values(const uint8_t* __restrict__ in,
in_buffer = tmp_buffer;
}
- // Use switch with fall-through cases to minimise branching.
- switch (num_values) {
-// Expand cases from 31 down to 1.
#pragma push_macro("UNPACK_VALUES_CASE")
#define UNPACK_VALUES_CASE(ignore1, i, ignore2) \
case 31 - i: out[30 - i] = \
static_cast<OutType>(UnpackValue<BIT_WIDTH, 30 - i>(in_buffer));
+
+ // Use switch with fall-through cases to minimise branching.
+ switch (num_values) {
+ // Expand cases from 31 down to 1.
BOOST_PP_REPEAT_FROM_TO(0, 31, UNPACK_VALUES_CASE, ignore);
-#pragma pop_macro("UNPACK_VALUES_CASE")
case 0: break;
default: DCHECK(false);
}
return in + BYTES_TO_READ;
+#pragma pop_macro("UNPACK_VALUES_CASE")
+}
+
+template <typename OutType, int BIT_WIDTH>
+const uint8_t* BitPacking::UnpackAndDecodeUpTo31Values(const uint8_t* __restrict__ in,
+ int64_t in_bytes, OutType* __restrict__ dict, int64_t dict_len, int num_values,
+ OutType* __restrict__ out, bool* __restrict__ decode_error) {
+ static_assert(BIT_WIDTH >= 0, "BIT_WIDTH too low");
+ static_assert(BIT_WIDTH <= 32, "BIT_WIDTH > 32");
+ DCHECK_LE(BIT_WIDTH, sizeof(OutType) * CHAR_BIT) << "BIT_WIDTH too high for output";
+ constexpr int MAX_BATCH_SIZE = 31;
+ const int BYTES_TO_READ = BitUtil::RoundUpNumBytes(num_values * BIT_WIDTH);
+ DCHECK_GE(in_bytes, BYTES_TO_READ);
+ DCHECK_LE(num_values, MAX_BATCH_SIZE);
+
+ // Make sure the buffer is at least 1 byte.
+ constexpr int TMP_BUFFER_SIZE = BIT_WIDTH ?
+ (BIT_WIDTH * (MAX_BATCH_SIZE + 1)) / CHAR_BIT : 1;
+ uint8_t tmp_buffer[TMP_BUFFER_SIZE];
+
+ const uint8_t* in_buffer = in;
+ // Copy into padded temporary buffer to avoid reading past the end of 'in' if the
+ // last 32-bit load would go past the end of the buffer.
+ if (BitUtil::RoundUp(BYTES_TO_READ, sizeof(uint32_t)) > in_bytes) {
+ memcpy(tmp_buffer, in, BYTES_TO_READ);
+ in_buffer = tmp_buffer;
+ }
+
+#pragma push_macro("DECODE_VALUES_CASE")
+#define DECODE_VALUES_CASE(ignore1, i, ignore2) \
+ case 31 - i: { \
+ uint32_t idx = UnpackValue<BIT_WIDTH, 30 - i>(in_buffer); \
+ DecodeValue(dict, dict_len, idx, &out[30 - i], decode_error); \
+ }
+
+ // Use switch with fall-through cases to minimise branching.
+ switch (num_values) {
+ // Expand cases from 31 down to 1.
+ BOOST_PP_REPEAT_FROM_TO(0, 31, DECODE_VALUES_CASE, ignore);
+ case 0:
+ break;
+ default:
+ DCHECK(false);
+ }
+ return in + BYTES_TO_READ;
+#pragma pop_macro("DECODE_VALUES_CASE")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.h b/be/src/util/bit-stream-utils.h
index 5acdeee..bac127a 100644
--- a/be/src/util/bit-stream-utils.h
+++ b/be/src/util/bit-stream-utils.h
@@ -92,53 +92,67 @@ class BitWriter {
int bit_offset_; // Offset in buffered_values_
};
-/// Utility class to read bit/byte stream. This class can read bits or bytes
-/// that are either byte aligned or not. It also has utilities to read multiple
-/// bytes in one read (e.g. encoded int).
-class BitReader {
+/// Utility class to read bit/byte stream. This class can read bits or bytes that are
+/// either byte aligned or not. It also has utilities to read multiple bytes in one
+/// read (e.g. encoded int). Exposes a batch-oriented interface to allow efficient
+/// processing of multiple values at a time.
+class BatchedBitReader {
public:
/// 'buffer' is the buffer to read from. The buffer's length is 'buffer_len'.
/// Does not take ownership of the buffer.
- BitReader(const uint8_t* buffer, int buffer_len) { Reset(buffer, buffer_len); }
+ BatchedBitReader(const uint8_t* buffer, int64_t buffer_len) {
+ Reset(buffer, buffer_len);
+ }
- BitReader() : buffer_(NULL), max_bytes_(0) {}
+ BatchedBitReader() {}
- // The implicit copy constructor is left defined. If a BitReader is copied, the
+ // The implicit copy constructor is left defined. If a BatchedBitReader is copied, the
// two copies do not share any state. Invoking functions on either copy continues
// reading from the current read position without modifying the state of the other
// copy.
/// Resets the read to start reading from the start of 'buffer'. The buffer's
/// length is 'buffer_len'. Does not take ownership of the buffer.
- void Reset(const uint8_t* buffer, int buffer_len) {
- buffer_ = buffer;
- max_bytes_ = buffer_len;
- byte_offset_ = 0;
- bit_offset_ = 0;
- int num_bytes = std::min(8, max_bytes_);
- memcpy(&buffered_values_, buffer_, num_bytes);
+ void Reset(const uint8_t* buffer, int64_t buffer_len) {
+ buffer_pos_ = buffer;
+ buffer_end_ = buffer + buffer_len;
}
- /// Gets the next value from the buffer. Returns true if 'v' could be read or false if
- /// there are not enough bytes left. num_bits must be <= 32.
+ /// Gets up to 'num_values' bit-packed values, starting from the current byte in the
+ /// buffer and advance the read position. 'bit_width' must be <= 32.
+ /// If 'bit_width' * 'num_values' is not a multiple of 8, the trailing bytes are
+ /// skipped and the next UnpackBatch() call will start reading from the next byte.
+ ///
+ /// If the caller does not want to drop trailing bits, 'num_values' must be exactly the
+ /// total number of values the caller wants to read from a run of bit-packed values, or
+ /// 'bit_width' * 'num_values' must be a multiple of 8. This condition is always
+ /// satisfied if 'num_values' is a multiple of 32.
+ ///
+ /// Returns the number of values read.
template<typename T>
- bool GetValue(int num_bits, T* v);
+ int UnpackBatch(int bit_width, int num_values, T* v);
- /// Reads a 'num_bytes'-sized value from the buffer and stores it in 'v'. T needs to be a
- /// little-endian native type and big enough to store 'num_bytes'. The value is assumed
- /// to be byte-aligned so the stream will be advanced to the start of the next byte
- /// before 'v' is read. Returns false if there are not enough bytes left.
+ /// Unpack bit-packed values in the same way as UnpackBatch() and decode them using the
+ /// dictionary 'dict' with 'dict_len' entries. Return -1 if a decoding error is
+ /// encountered, i.e. if the bit-packed values are not valid indices in 'dict'.
+ /// Otherwise returns the number of values decoded.
template<typename T>
- bool GetAligned(int num_bytes, T* v);
+ int UnpackAndDecodeBatch(
+ int bit_width, T* dict, int64_t dict_len, int num_values, T* v);
+
+ /// Reads an unpacked 'num_bytes'-sized value from the buffer and stores it in 'v'. T
+ /// needs to be a little-endian native type and big enough to store 'num_bytes'.
+ /// Returns false if there are not enough bytes left.
+ template<typename T>
+ bool GetBytes(int num_bytes, T* v);
/// Reads a vlq encoded int from the stream. The encoded int must start at the
/// beginning of a byte. Return false if there were not enough bytes in the buffer or
/// the int is invalid.
bool GetVlqInt(int32_t* v);
- /// Returns the number of bytes left in the stream, not including the current byte (i.e.,
- /// there may be an additional fraction of a byte).
- int bytes_left() { return max_bytes_ - (byte_offset_ + BitUtil::Ceil(bit_offset_, 8)); }
+ /// Returns the number of bytes left in the stream.
+ int bytes_left() { return buffer_end_ - buffer_pos_; }
/// Maximum byte length of a vlq encoded int
static const int MAX_VLQ_BYTE_LEN = 5;
@@ -147,17 +161,12 @@ class BitReader {
static const int MAX_BITWIDTH = 32;
private:
- const uint8_t* buffer_;
- int max_bytes_;
+ /// Current read position in the buffer.
+ const uint8_t* buffer_pos_ = nullptr;
- /// Bytes are memcpy'd from buffer_ and values are read from this variable. This is
- /// faster than reading values byte by byte directly from buffer_.
- uint64_t buffered_values_;
-
- int byte_offset_; // Offset in buffer_
- int bit_offset_; // Offset in buffered_values_
+ /// Pointer to the byte after the end of the buffer.
+ const uint8_t* buffer_end_ = nullptr;
};
-
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/bit-stream-utils.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-stream-utils.inline.h b/be/src/util/bit-stream-utils.inline.h
index c8744aa..3974492 100644
--- a/be/src/util/bit-stream-utils.inline.h
+++ b/be/src/util/bit-stream-utils.inline.h
@@ -18,9 +18,11 @@
#ifndef IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
#define IMPALA_UTIL_BIT_STREAM_UTILS_INLINE_H
-#include "common/compiler-util.h"
#include "util/bit-stream-utils.h"
+#include "common/compiler-util.h"
+#include "util/bit-packing.inline.h"
+
namespace impala {
inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
@@ -84,83 +86,67 @@ inline bool BitWriter::PutVlqInt(int32_t v) {
return result;
}
-/// Force inlining - this is used in perf-critical loops in Parquet and GCC often doesn't
-/// inline it in cases where it's beneficial.
-template <typename T>
-ALWAYS_INLINE inline bool BitReader::GetValue(int num_bits, T* v) {
- DCHECK(num_bits == 0 || buffer_ != NULL);
- // TODO: revisit this limit if necessary
- DCHECK_LE(num_bits, MAX_BITWIDTH);
- DCHECK_LE(num_bits, sizeof(T) * 8);
-
- // First do a cheap check to see if we may read past the end of the stream, using
- // constant upper bounds for 'bit_offset_' and 'num_bits'.
- if (UNLIKELY(byte_offset_ + sizeof(buffered_values_) + MAX_BITWIDTH / 8 > max_bytes_)) {
- // Now do the precise check.
- if (UNLIKELY(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8)) {
- return false;
- }
- }
-
- DCHECK_GE(bit_offset_, 0);
- DCHECK_LE(bit_offset_, 64);
- *v = BitUtil::TrailingBits(buffered_values_, bit_offset_ + num_bits) >> bit_offset_;
-
- bit_offset_ += num_bits;
- if (bit_offset_ >= 64) {
- byte_offset_ += 8;
- bit_offset_ -= 64;
-
- int bytes_remaining = max_bytes_ - byte_offset_;
- if (LIKELY(bytes_remaining >= 8)) {
- memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
- } else {
- memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
- }
+template<typename T>
+inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) {
+ DCHECK(buffer_pos_ != nullptr);
+ DCHECK_GE(bit_width, 0);
+ DCHECK_LE(bit_width, MAX_BITWIDTH);
+ DCHECK_LE(bit_width, sizeof(T) * 8);
+ DCHECK_GE(num_values, 0);
+
+ int64_t num_read;
+ std::tie(buffer_pos_, num_read) = BitPacking::UnpackValues(bit_width, buffer_pos_,
+ bytes_left(), num_values, v);
+ DCHECK_LE(buffer_pos_, buffer_end_);
+ DCHECK_LE(num_read, num_values);
+ return static_cast<int>(num_read);
+}
- // Read bits of v that crossed into new buffered_values_
- *v |= BitUtil::TrailingBits(buffered_values_, bit_offset_)
- << (num_bits - bit_offset_);
- }
- DCHECK_LE(bit_offset_, 64);
- return true;
+template<typename T>
+inline int BatchedBitReader::UnpackAndDecodeBatch(
+ int bit_width, T* dict, int64_t dict_len, int num_values, T* v){
+ DCHECK(buffer_pos_ != nullptr);
+ DCHECK_GE(bit_width, 0);
+ DCHECK_LE(bit_width, MAX_BITWIDTH);
+ DCHECK_LE(bit_width, sizeof(T) * 8);
+ DCHECK_GE(num_values, 0);
+
+ const uint8_t* new_buffer_pos;
+ int64_t num_read;
+ bool decode_error = false;
+ std::tie(new_buffer_pos, num_read) = BitPacking::UnpackAndDecodeValues(bit_width,
+ buffer_pos_, bytes_left(), dict, dict_len, num_values, v, &decode_error);
+ if (UNLIKELY(decode_error)) return -1;
+ buffer_pos_ = new_buffer_pos;
+ DCHECK_LE(buffer_pos_, buffer_end_);
+ DCHECK_LE(num_read, num_values);
+ return static_cast<int>(num_read);
}
template<typename T>
-inline bool BitReader::GetAligned(int num_bytes, T* v) {
+inline bool BatchedBitReader::GetBytes(int num_bytes, T* v) {
+ DCHECK(buffer_pos_ != nullptr);
+ DCHECK_GE(num_bytes, 0);
DCHECK_LE(num_bytes, sizeof(T));
- int bytes_read = BitUtil::Ceil(bit_offset_, 8);
- if (UNLIKELY(byte_offset_ + bytes_read + num_bytes > max_bytes_)) return false;
-
- // Advance byte_offset to next unread byte and read num_bytes
- byte_offset_ += bytes_read;
- memcpy(v, buffer_ + byte_offset_, num_bytes);
- byte_offset_ += num_bytes;
-
- // Reset buffered_values_
- bit_offset_ = 0;
- int bytes_remaining = max_bytes_ - byte_offset_;
- if (LIKELY(bytes_remaining >= 8)) {
- memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
- } else {
- memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
- }
+ if (UNLIKELY(buffer_pos_ + num_bytes > buffer_end_)) return false;
+ *v = 0; // Ensure unset bytes are initialized to zero.
+ memcpy(v, buffer_pos_, num_bytes);
+ buffer_pos_ += num_bytes;
return true;
}
-inline bool BitReader::GetVlqInt(int32_t* v) {
+inline bool BatchedBitReader::GetVlqInt(int32_t* v) {
*v = 0;
int shift = 0;
uint8_t byte = 0;
do {
if (UNLIKELY(shift >= MAX_VLQ_BYTE_LEN * 7)) return false;
- if (!GetAligned<uint8_t>(1, &byte)) return false;
+ if (!GetBytes(1, &byte)) return false;
*v |= (byte & 0x7F) << shift;
shift += 7;
} while ((byte & 0x80) != 0);
return true;
}
-
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index 62b3d3a..fa5e798 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -174,13 +174,16 @@ class DictDecoderBase {
DCHECK_GE(buffer_len, 0);
if (UNLIKELY(buffer_len == 0)) return Status("Dictionary cannot be 0 bytes");
uint8_t bit_width = *buffer;
- if (UNLIKELY(bit_width < 0 || bit_width > BitReader::MAX_BITWIDTH)) {
+ if (UNLIKELY(bit_width < 0 || bit_width > BatchedBitReader::MAX_BITWIDTH)) {
return Status(strings::Substitute("Dictionary has invalid or unsupported bit "
"width: $0", bit_width));
}
++buffer;
--buffer_len;
data_decoder_.Reset(buffer, buffer_len, bit_width);
+ num_repeats_ = 0;
+ num_literal_values_ = 0;
+ next_literal_idx_ = 0;
return Status::OK();
}
@@ -193,7 +196,22 @@ class DictDecoderBase {
virtual void GetValue(int index, void* buffer) = 0;
protected:
- RleDecoder data_decoder_;
+ /// Number of decoded values to buffer at a time. A multiple of 32 is chosen to allow
+ /// efficient reading in batches from data_decoder_. Increasing the batch size up to
+ /// 128 seems to improve performance, but increasing further did not make a noticeable
+ /// difference.
+ static const int DECODED_BUFFER_SIZE = 128;
+
+ RleBatchDecoder<uint32_t> data_decoder_;
+
+ /// Greater than zero if we've started decoding a repeated run.
+ int64_t num_repeats_ = 0;
+
+ /// Greater than zero if we have buffered some literal values.
+ int num_literal_values_ = 0;
+
+ /// The index of the next decoded value to return.
+ int next_literal_idx_ = 0;
};
template<typename T>
@@ -211,7 +229,7 @@ class DictDecoder : public DictDecoderBase {
/// Returns true if the dictionary values were all successfully decoded, or false
/// if the dictionary was corrupt.
template<parquet::Type::type PARQUET_TYPE>
- bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size);
+ bool Reset(uint8_t* dict_buffer, int dict_len, int fixed_len_size) WARN_UNUSED_RESULT;
virtual int num_entries() const { return dict_.size(); }
@@ -219,17 +237,26 @@ class DictDecoder : public DictDecoderBase {
T* val_ptr = reinterpret_cast<T*>(buffer);
DCHECK_GE(index, 0);
DCHECK_LT(index, dict_.size());
- // TODO: is there any circumstance where this should be a memcpy?
*val_ptr = dict_[index];
}
/// Returns the next value. Returns false if the data is invalid.
/// For StringValues, this does not make a copy of the data. Instead,
/// the string data is from the dictionary buffer passed into the c'tor.
- bool GetNextValue(T* value);
+ bool GetNextValue(T* value) WARN_UNUSED_RESULT;
private:
std::vector<T> dict_;
+
+ /// Decoded values, buffered to allow caller to consume one-by-one. If in the middle of
+ /// a repeated run, the first element is the current dict value. If in a literal run,
+ /// this contains 'num_literal_values_' values, with the next value to be returned at
+ /// 'next_literal_idx_'.
+ T decoded_values_[DECODED_BUFFER_SIZE];
+
+ /// Slow path for GetNextValue() where we need to decode new values. Should not be
+ /// inlined everywhere.
+ bool DecodeNextValue(T* value);
};
template<typename T>
@@ -290,30 +317,47 @@ inline int DictEncoder<StringValue>::AddToTable(const StringValue& value,
// Force inlining - GCC does not always inline this into hot loops in Parquet scanner.
template <typename T>
ALWAYS_INLINE inline bool DictDecoder<T>::GetNextValue(T* value) {
- int index = -1; // Initialize to avoid compiler warning.
- bool result = data_decoder_.Get(&index);
- // Use & to avoid branches.
- if (LIKELY(result & (index >= 0) & (index < dict_.size()))) {
- *value = dict_[index];
+ // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16
+ // byte aligned for Decimal16Values.
+ if (num_repeats_ > 0) {
+ --num_repeats_;
+ memcpy(value, &decoded_values_[0], sizeof(T));
+ return true;
+ } else if (next_literal_idx_ < num_literal_values_) {
+ int idx = next_literal_idx_++;
+ memcpy(value, &decoded_values_[idx], sizeof(T));
return true;
}
- return false;
+ // No decoded values left - need to decode some more.
+ return DecodeNextValue(value);
}
-// Force inlining - GCC does not always inline this into hot loops in Parquet scanner.
-template <>
-ALWAYS_INLINE inline bool DictDecoder<Decimal16Value>::GetNextValue(
- Decimal16Value* value) {
- int index;
- bool result = data_decoder_.Get(&index);
- if (!result) return false;
- if (index >= dict_.size()) return false;
- // Workaround for IMPALA-959. Use memcpy instead of '=' so addresses
- // do not need to be 16 byte aligned.
- uint8_t* addr = reinterpret_cast<uint8_t*>(dict_.data());
- addr = addr + index * sizeof(*value);
- memcpy(value, addr, sizeof(*value));
- return true;
+template <typename T>
+bool DictDecoder<T>::DecodeNextValue(T* value) {
+ // IMPALA-959: Use memcpy() instead of '=' to set *value: addresses are not always 16
+ // byte aligned for Decimal16Values.
+ uint32_t num_repeats = data_decoder_.NextNumRepeats();
+ if (num_repeats > 0) {
+ uint32_t idx = data_decoder_.GetRepeatedValue(num_repeats);
+ if (UNLIKELY(idx >= dict_.size())) return false;
+ memcpy(&decoded_values_[0], &dict_[idx], sizeof(T));
+ memcpy(value, &decoded_values_[0], sizeof(T));
+ num_repeats_ = num_repeats - 1;
+ return true;
+ } else {
+ uint32_t num_literals = data_decoder_.NextNumLiterals();
+ if (UNLIKELY(num_literals == 0)) return false;
+
+ uint32_t num_to_decode = std::min<uint32_t>(num_literals, DECODED_BUFFER_SIZE);
+ if (UNLIKELY(!data_decoder_.DecodeLiteralValues(
+ num_to_decode, dict_.data(), dict_.size(), &decoded_values_[0]))) {
+ return false;
+ }
+ num_literal_values_ = num_to_decode;
+ memcpy(value, &decoded_values_[0], sizeof(T));
+ next_literal_idx_ = 1;
+ return true;
+ }
}
template<typename T>
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ae116b5b/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index de0fb11..11043f0 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -17,7 +17,9 @@
#include <stdlib.h>
#include <stdio.h>
+
#include <iostream>
+#include <utility>
#include "runtime/mem-tracker.h"
#include "runtime/string-value.inline.h"
@@ -65,7 +67,7 @@ void ValidateDict(const vector<InternalType>& values,
ASSERT_OK(decoder.SetData(data_buffer, data_len));
for (InternalType i: values) {
InternalType j;
- decoder.GetNextValue(&j);
+ ASSERT_TRUE(decoder.GetNextValue(&j));
EXPECT_EQ(i, j);
}
pool.FreeAll();
@@ -196,6 +198,98 @@ TEST(DictTest, TestStringBufferOverrun) {
0));
}
+// Make sure that SetData() resets the dictionary decoder, including the embedded RLE
+// decoder to a clean state, even if the input is not fully consumed. The RLE decoder
+// has various state that needs to be reset.
+TEST(DictTest, SetDataAfterPartialRead) {
+ MemTracker tracker;
+ MemPool pool(&tracker);
+ DictEncoder<int> encoder(&pool, sizeof(int));
+
+ // Literal run followed by a repeated run.
+ vector<int> values{1, 2, 3, 4, 5, 6, 7, 8, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9};
+ for (int val: values) encoder.Put(val);
+
+ vector<uint8_t> dict_buffer(encoder.dict_encoded_size());
+ encoder.WriteDict(dict_buffer.data());
+ vector<uint8_t> data_buffer(encoder.EstimatedDataEncodedSize() * 2);
+ int data_len = encoder.WriteData(data_buffer.data(), data_buffer.size());
+ ASSERT_GT(data_len, 0);
+ encoder.ClearIndices();
+
+ DictDecoder<int> decoder;
+ ASSERT_TRUE(decoder.template Reset<parquet::Type::INT32>(
+ dict_buffer.data(), dict_buffer.size(), sizeof(int)));
+
+ // Test decoding some of the values, then resetting. If the decoder incorrectly
+ // caches some values, this could produce incorrect results.
+ for (int num_to_decode = 0; num_to_decode < values.size(); ++num_to_decode) {
+ ASSERT_OK(decoder.SetData(data_buffer.data(), data_buffer.size()));
+ for (int i = 0; i < num_to_decode; ++i) {
+ int val;
+ ASSERT_TRUE(decoder.GetNextValue(&val));
+ EXPECT_EQ(values[i], val) << num_to_decode << " " << i;
+ }
+ }
+}
+
+// Test handling of decode errors from out-of-range values.
+TEST(DictTest, DecodeErrors) {
+ MemTracker tracker;
+ MemPool pool(&tracker);
+ DictEncoder<int> small_dict_encoder(&pool, sizeof(int));
+
+ // Generate a dictionary with 9 values (requires 4 bits to encode).
+ vector<int> small_dict_values{1, 2, 3, 4, 5, 6, 7, 8, 9};
+ for (int val: small_dict_values) small_dict_encoder.Put(val);
+
+ vector<uint8_t> small_dict_buffer(small_dict_encoder.dict_encoded_size());
+ small_dict_encoder.WriteDict(small_dict_buffer.data());
+ small_dict_encoder.ClearIndices();
+
+ DictDecoder<int> small_dict_decoder;
+ ASSERT_TRUE(small_dict_decoder.template Reset<parquet::Type::INT32>(
+ small_dict_buffer.data(), small_dict_buffer.size(), sizeof(int)));
+
+ // Generate dictionary-encoded data with between 9 and 15 distinct values to test that
+ // error is detected when the decoder reads a 4-bit value that is out of range.
+ using TestCase = pair<string, vector<int>>;
+ vector<TestCase> test_cases{
+ {"Out-of-range value in a repeated run",
+ {10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 10}},
+ {"Out-of-range literal run in the last < 32 element batch",
+ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}},
+ {"Out-of-range literal run in the middle of a 32 element batch",
+ {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
+ 11, 12, 13, 14, 15, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}}};
+ for (TestCase& test_case: test_cases) {
+ // Encode the values. This will produce a dictionary with more distinct values than
+ // the small dictionary that we'll use to decode it.
+ DictEncoder<int> large_dict_encoder(&pool, sizeof(int));
+ // Initialize the dictionary with the values already in the small dictionary.
+ for (int val : small_dict_values) large_dict_encoder.Put(val);
+ large_dict_encoder.ClearIndices();
+
+ for (int val: test_case.second) large_dict_encoder.Put(val);
+
+ vector<uint8_t> data_buffer(large_dict_encoder.EstimatedDataEncodedSize() * 2);
+ int data_len = large_dict_encoder.WriteData(data_buffer.data(), data_buffer.size());
+ ASSERT_GT(data_len, 0);
+ large_dict_encoder.ClearIndices();
+
+ ASSERT_OK(small_dict_decoder.SetData(data_buffer.data(), data_buffer.size()));
+ bool failed = false;
+ for (int i = 0; i < test_case.second.size(); ++i) {
+ int val;
+ failed = !small_dict_decoder.GetNextValue(&val);
+ if (failed) break;
+ }
+ EXPECT_TRUE(failed) << "Should have detected out-of-range dict-encoded value in test "
+ << test_case.first;
+ }
+}
+
}
IMPALA_TEST_MAIN();