You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/04/26 23:45:50 UTC

[impala] 01/03: IMPALA-8381: Optimize ParquetPlainEncoder::DecodeBatch() for simple types

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6a703741d8fdc359833a0d593ca8b121cd5d890d
Author: Daniel Becker <da...@cloudera.com>
AuthorDate: Mon Apr 8 16:28:26 2019 +0200

    IMPALA-8381: Optimize ParquetPlainEncoder::DecodeBatch() for simple types
    
    Refactored the ParquetPlainEncoder::Decode() and
    ParquetPlainEncoder::DecodeBatch() methods to increase performance in
    batch decoding.
    
    The `Decode` and `DecodeBatch` methods retain their behaviour and
    outward interface, but the internal structure changes.
    
    We change how we split up the `Decode` template specialisations. The
    generic unspecialised template is used for numerical parquet types
    (INT32, INT64, INT96, FLOAT and DOUBLE) and various specialisations are
    used for BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY.
    
    We add a new method template, DecodeNoCheck, which does the actual
    decoding without bounds checking. It is called by the generic Decode
    method template internally. For all parquet types except for BYTE_ARRAY,
    DecodeBatch performs the bounds check once for the whole batch at the
    same time and calls DecodeNoCheck, so we save the cost of bounds
    checking for every decoded value. For BYTE_ARRAY, this cannot be done
    and we have to perform the checks for every value.
    
    In the non-BYTE_ARRAY version of DecodeBatch, we explicitly unroll the
    loop in batches of 8 to increase performance.
    
    The overall performance increase is up to 2x for small strides (8 bytes,
    INT32) but decreases as the stride increases, and disappears from around
    40 bytes. With bigger strides, there is no performance difference from
    the previous implementation.
    
    Testing:
      Added tests to parquet-plain-test.cc to test the `Decode` and the
      `DecodeBatch` methods both in single-value decoding and batch
      decoding.
    
    Change-Id: I57b7d2573bb6dfd038e581acb3bd8ea1565aa20d
    Reviewed-on: http://gerrit.cloudera.org:8080/12985
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/parquet-common.h       | 226 +++++++++++++++++++++-------
 be/src/exec/parquet/parquet-plain-test.cc  | 229 ++++++++++++++++++++++++-----
 be/src/testutil/random-vector-generators.h | 108 ++++++++++++++
 3 files changed, 468 insertions(+), 95 deletions(-)

diff --git a/be/src/exec/parquet/parquet-common.h b/be/src/exec/parquet/parquet-common.h
index d225814..a7184ba 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -87,6 +87,26 @@ class ParquetPlainEncoder {
     }
   }
 
+  /// Returns the byte size of a value of Parqet type `type`. If `type` is
+  /// FIXED_LEN_BYTE_ARRAY, the argument `fixed_len_size` is returned.
+  /// The type must be one of INT32, INT64, INT96, FLOAT, DOUBLE or FIXED_LEN_BYTE_ARRAY.
+  /// BOOLEANs are not plain encoded and BYTE_ARRAYs do not have a fixed size, therefore
+  /// they are not supported.
+  static ALWAYS_INLINE int EncodedByteSize(const parquet::Type::type type,
+      const int fixed_len_size) {
+    switch (type) {
+      case parquet::Type::type::INT32: return sizeof(int32_t);
+      case parquet::Type::type::INT64: return sizeof(int64_t);
+      case parquet::Type::type::INT96: return 12;
+      case parquet::Type::type::FLOAT: return sizeof(float);
+      case parquet::Type::type::DOUBLE: return sizeof(double);
+      case parquet::Type::type::FIXED_LEN_BYTE_ARRAY: return fixed_len_size;
+      default: DCHECK(false);
+    };
+
+    return -1;
+  }
+
   /// The minimum byte size to store decimals of with precision t.precision.
   static int DecimalSize(const ColumnType& t) {
     DCHECK(t.type == TYPE_DECIMAL);
@@ -177,27 +197,31 @@ class ParquetPlainEncoder {
     }
   }
 
-  /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 'buffer'
-  /// need not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 'fixed_len_size'
-  /// is the size of the object. Otherwise, it is unused.
-  /// Returns the number of bytes read or -1 if the value was not decoded successfully.
-  /// This generic template function is used with the following types:
-  /// =============================
-  /// InternalType   | PARQUET_TYPE
-  /// =============================
-  /// int32_t        | INT32
-  /// int64_t        | INT64
-  /// float          | FLOAT
-  /// double         | DOUBLE
-  /// Decimal4Value  | INT32
-  /// Decimal8Value  | INT64
-  /// TimestampValue | INT96
+  /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 'buffer' need
+  /// not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 'fixed_len_size' is the
+  /// size of the object. Otherwise, it is unused. Returns the number of bytes read or -1
+  /// if the value was not decoded successfully.
+  /// This generic template function is used when PARQUET_TYPE is one of INT32, INT64,
+  /// INT96, FLOAT, DOUBLE or FIXED_LEN_BYTE_ARRAY.
   template <typename InternalType, parquet::Type::type PARQUET_TYPE>
   static int Decode(const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
       InternalType* v) {
-    int byte_size = ByteSize(*v);
+    /// We cannot make it a static assert because the DecodeByParquetType template
+    /// potentially calls this function with every combination of internal and parquet
+    /// types, not only the valid ones. The invalid combinations do not occur at runtime,
+    /// but they cannot be ruled out at compile time.
+    DCHECK(PARQUET_TYPE == parquet::Type::type::INT32
+        || PARQUET_TYPE == parquet::Type::type::INT64
+        || PARQUET_TYPE == parquet::Type::type::INT96
+        || PARQUET_TYPE == parquet::Type::type::FLOAT
+        || PARQUET_TYPE == parquet::Type::type::DOUBLE
+        || PARQUET_TYPE == parquet::Type::type::FIXED_LEN_BYTE_ARRAY);
+
+    int byte_size = EncodedByteSize(PARQUET_TYPE, fixed_len_size);
+
     if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
-    memcpy(v, buffer, byte_size);
+    DecodeNoBoundsCheck<InternalType, PARQUET_TYPE>(buffer, buffer_end,
+        fixed_len_size, v);
     return byte_size;
   }
 
@@ -209,6 +233,23 @@ class ParquetPlainEncoder {
   template <typename InternalType, parquet::Type::type PARQUET_TYPE>
   static int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end,
       int fixed_len_size, int64_t num_values, int64_t stride, InternalType* v);
+
+ private:
+  /// Decode values without bounds checking. `buffer_end` is only used for DCHECKs in
+  /// DEBUG mode, it is unused in RELEASE mode.
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  static ALWAYS_INLINE inline void DecodeNoBoundsCheck(const uint8_t* buffer,
+      const uint8_t* buffer_end, int fixed_len_size, InternalType* v);
+
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  static int64_t DecodeBatchAlwaysBoundsCheck(const uint8_t* buffer,
+      const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
+      InternalType* v);
+
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  static int64_t DecodeBatchOneBoundsCheck(const uint8_t* buffer,
+      const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
+      InternalType* v);
 };
 
 /// Calling this with arguments of type ColumnType is certainly a programmer error, so we
@@ -258,46 +299,55 @@ inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) {
 template <typename From, typename To>
 inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t* buffer_end, To* v) {
   int byte_size = sizeof(From);
-  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
+  DCHECK_GE(buffer_end - buffer, byte_size);
   From dest;
   memcpy(&dest, buffer, byte_size);
   *v = dest;
   return byte_size;
 }
 
-template <>
-inline int ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT32>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int64_t* v) {
-  return DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v);
+/// Decodes a value without bounds checking.
+/// This generic template function is used with the following types:
+/// =============================
+/// InternalType   | PARQUET_TYPE
+/// =============================
+/// int8_t         | INT32
+/// int16_t        | INT32
+/// int32_t        | INT32
+/// int64_t        | INT64
+/// float          | FLOAT
+/// double         | DOUBLE
+/// Decimal4Value  | INT32
+/// Decimal8Value  | INT64
+/// TimestampValue | INT96
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void ParquetPlainEncoder::DecodeNoBoundsCheck(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, InternalType* v) {
+  int byte_size = EncodedByteSize(PARQUET_TYPE, -1);
+  DCHECK_GE(buffer_end - buffer, byte_size);
+
+  /// This generic template is only used when either no conversion is needed or with
+  /// narrowing integer conversions (e.g. int32_t to int16_t or int8_t) where copying the
+  /// lower bytes is the correct conversion.
+  memcpy(v, buffer, sizeof(InternalType));
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode<double, parquet::Type::INT32>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) {
-  return DecodeWithConversion<int32_t, double>(buffer, buffer_end, v);
+inline void ParquetPlainEncoder::DecodeNoBoundsCheck<int64_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int64_t* v) {
+  DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v);
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode<double, parquet::Type::FLOAT>(
+inline void ParquetPlainEncoder::DecodeNoBoundsCheck<double, parquet::Type::INT32>(
     const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) {
-  return DecodeWithConversion<float, double>(buffer, buffer_end, v);
+  DecodeWithConversion<int32_t, double>(buffer, buffer_end, v);
 }
 
 template <>
-inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int8_t* v) {
-  int byte_size = ByteSize(*v);
-  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
-  *v = *buffer;
-  return byte_size;
-}
-template <>
-inline int ParquetPlainEncoder::Decode<int16_t, parquet::Type::INT32>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, int16_t* v) {
-  int byte_size = ByteSize(*v);
-  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
-  memcpy(v, buffer, sizeof(int16_t));
-  return byte_size;
+inline void ParquetPlainEncoder::DecodeNoBoundsCheck<double, parquet::Type::FLOAT>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, double* v) {
+  DecodeWithConversion<float, double>(buffer, buffer_end, v);
 }
 
 template<typename T>
@@ -371,26 +421,85 @@ inline int ParquetPlainEncoder::Encode(
 }
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE>
-inline int64_t ParquetPlainEncoder::DecodeBatch(const uint8_t* buffer,
+inline int64_t ParquetPlainEncoder::DecodeBatchAlwaysBoundsCheck(const uint8_t* buffer,
     const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
     InternalType* v) {
   const uint8_t* buffer_pos = buffer;
   StrideWriter<InternalType> out(v, stride);
+
   for (int64_t i = 0; i < num_values; ++i) {
     int encoded_len = Decode<InternalType, PARQUET_TYPE>(
         buffer_pos, buffer_end, fixed_len_size, out.Advance());
     if (UNLIKELY(encoded_len < 0)) return -1;
     buffer_pos += encoded_len;
   }
+
   return buffer_pos - buffer;
 }
 
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+inline int64_t ParquetPlainEncoder::DecodeBatchOneBoundsCheck(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
+    InternalType* v) {
+  const uint8_t* buffer_pos = buffer;
+  uint8_t* output = reinterpret_cast<uint8_t*>(v);
+  const int byte_size_of_element = EncodedByteSize(PARQUET_TYPE, fixed_len_size);
+
+  if (UNLIKELY(buffer_end - buffer < num_values * byte_size_of_element)) return -1;
+
+  /// We unroll the loop manually in batches of 8.
+  constexpr int batch = 8;
+  const int full_batches = num_values / batch;
+  const int remainder = num_values % batch;
+
+  for (int b = 0; b < full_batches; b++) {
+#pragma push_macro("DECODE_NO_CHECK_UNROLL")
+#define DECODE_NO_CHECK_UNROLL(ignore1, i, ignore2) \
+    DecodeNoBoundsCheck<InternalType, PARQUET_TYPE>( \
+        buffer_pos + i * byte_size_of_element, buffer_end, fixed_len_size, \
+        reinterpret_cast<InternalType*>(output + i * stride));
+
+    BOOST_PP_REPEAT_FROM_TO(0, 8 /* The value of `batch` */,
+        DECODE_NO_CHECK_UNROLL, ignore);
+#pragma pop_macro("DECODE_NO_CHECK_UNROLL")
+
+    output += batch * stride;
+    buffer_pos += batch * byte_size_of_element;
+  }
+
+  StrideWriter<InternalType> out(reinterpret_cast<InternalType*>(output), stride);
+  for (int i = 0; i < remainder; i++) {
+    DecodeNoBoundsCheck<InternalType, PARQUET_TYPE>(
+        buffer_pos, buffer_end, fixed_len_size, out.Advance());
+    buffer_pos += byte_size_of_element;
+  }
+
+  DCHECK_EQ(buffer_pos - buffer, num_values * byte_size_of_element);
+  return buffer_pos - buffer;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+inline int64_t ParquetPlainEncoder::DecodeBatch(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t stride,
+    InternalType* v) {
+  /// Whether bounds checking needs to be done for every element or we can check the whole
+  /// batch at the same time.
+  constexpr bool has_variable_length =
+      PARQUET_TYPE == parquet::Type::type::BYTE_ARRAY;
+  if (has_variable_length) {
+    return DecodeBatchAlwaysBoundsCheck<InternalType, PARQUET_TYPE>(buffer, buffer_end,
+        fixed_len_size, num_values, stride, v);
+  } else {
+    return DecodeBatchOneBoundsCheck<InternalType, PARQUET_TYPE>(buffer, buffer_end,
+        fixed_len_size, num_values, stride, v);
+  }
+}
+
 template <typename T>
-inline int DecodeDecimalFixedLen(
+inline void DecodeDecimalFixedLen(
     const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, T* v) {
-  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
+  DCHECK_GE(buffer_end - buffer, fixed_len_size);
   DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
 }
 
 template <>
@@ -402,24 +511,27 @@ Decode<bool, parquet::Type::BOOLEAN>(const uint8_t* buffer,
 }
 
 template <>
-inline int ParquetPlainEncoder::
-Decode<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
-    const uint8_t* buffer_end, int fixed_len_size, Decimal4Value* v) {
-  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+inline void ParquetPlainEncoder::
+DecodeNoBoundsCheck<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal4Value* v) {
+  DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
 }
 
 template <>
-inline int ParquetPlainEncoder::
-Decode<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
-    const uint8_t* buffer_end, int fixed_len_size, Decimal8Value* v) {
-  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+inline void ParquetPlainEncoder::
+DecodeNoBoundsCheck<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal8Value* v) {
+  DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
 }
 
 template <>
-inline int ParquetPlainEncoder::
-Decode<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* buffer,
-    const uint8_t* buffer_end, int fixed_len_size, Decimal16Value* v) {
-  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+inline void ParquetPlainEncoder::
+DecodeNoBoundsCheck<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal16Value* v) {
+  DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
 }
 
 /// Helper method to decode Decimal type stored as variable length byte array.
diff --git a/be/src/exec/parquet/parquet-plain-test.cc b/be/src/exec/parquet/parquet-plain-test.cc
index 6eb880f..9fbe540 100644
--- a/be/src/exec/parquet/parquet-plain-test.cc
+++ b/be/src/exec/parquet/parquet-plain-test.cc
@@ -15,15 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <limits.h>
 #include <stdio.h>
 #include <stdlib.h>
+#include <algorithm>
 #include <iostream>
 #include "exec/parquet/parquet-common.h"
 #include "runtime/decimal-value.h"
 #include "runtime/string-value.inline.h"
 #include "runtime/timestamp-value.h"
 #include "testutil/gtest-util.h"
+#include "testutil/random-vector-generators.h"
+#include "testutil/rand-util.h"
 
 #include "common/names.h"
 
@@ -78,25 +80,8 @@ int Encode(const Decimal16Value& v, int encoded_byte_size, uint8_t* buffer,
 }
 
 /// Test that the decoder fails when asked to decode a truncated value.
-template <typename InternalType, parquet::Type::type PARQUET_TYPE>
-void TestTruncate(const InternalType& v, int expected_byte_size) {
-  uint8_t buffer[expected_byte_size];
-  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
-  EXPECT_EQ(encoded_size, expected_byte_size);
-
-  // Check all possible truncations of the buffer.
-  for (int truncated_size = encoded_size - 1; truncated_size >= 0; --truncated_size) {
-    InternalType result;
-    /// Copy to heap-allocated buffer so that ASAN can detect buffer overruns.
-    uint8_t* truncated_buffer = new uint8_t[truncated_size];
-    memcpy(truncated_buffer, buffer, truncated_size);
-    int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
-        truncated_buffer, truncated_buffer + truncated_size, expected_byte_size, &result);
-    EXPECT_EQ(-1, decoded_size);
-    delete[] truncated_buffer;
-  }
-}
-
+/// This function can be used for type widening tests but also tests without type
+/// widening, in which case `WidenInternalType` is the same as `InternalType`.
 template <typename InternalType, typename WidenInternalType,
     parquet::Type::type PARQUET_TYPE>
 void TestTruncate(const InternalType& v, int expected_byte_size) {
@@ -111,28 +96,14 @@ void TestTruncate(const InternalType& v, int expected_byte_size) {
     uint8_t* truncated_buffer = new uint8_t[truncated_size];
     memcpy(truncated_buffer, buffer, truncated_size);
     int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>(
-        truncated_buffer, truncated_buffer + truncated_size, expected_byte_size,
-        &result);
+        truncated_buffer, truncated_buffer + truncated_size, expected_byte_size, &result);
     EXPECT_EQ(-1, decoded_size);
     delete[] truncated_buffer;
   }
 }
 
-template <typename InternalType, parquet::Type::type PARQUET_TYPE>
-void TestType(const InternalType& v, int expected_byte_size) {
-  uint8_t buffer[expected_byte_size];
-  int encoded_size = Encode(v, expected_byte_size, buffer, PARQUET_TYPE);
-  EXPECT_EQ(encoded_size, expected_byte_size);
-
-  InternalType result;
-  int decoded_size = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(buffer,
-      buffer + expected_byte_size, expected_byte_size, &result);
-  EXPECT_EQ(decoded_size, expected_byte_size);
-  EXPECT_EQ(result, v);
-
-  TestTruncate<InternalType, PARQUET_TYPE>(v, expected_byte_size);
-}
-
+/// This function can be used for type widening tests but also tests without type
+/// widening, in which case `WidenInternalType` is the same as `InternalType`.
 template <typename InternalType, typename WidenInternalType,
     parquet::Type::type PARQUET_TYPE>
 void TestTypeWidening(const InternalType& v, int expected_byte_size) {
@@ -143,13 +114,27 @@ void TestTypeWidening(const InternalType& v, int expected_byte_size) {
   WidenInternalType result;
   int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>(
       buffer, buffer + expected_byte_size, expected_byte_size, &result);
-  EXPECT_EQ(decoded_size, expected_byte_size);
+  EXPECT_EQ(expected_byte_size, decoded_size);
   EXPECT_EQ(v, result);
 
+  WidenInternalType batch_result;
+  int batch_decoded_size
+      = ParquetPlainEncoder::DecodeBatch<WidenInternalType, PARQUET_TYPE>(
+          buffer, buffer + expected_byte_size, expected_byte_size, 1,
+          sizeof(WidenInternalType), &batch_result);
+  EXPECT_EQ(expected_byte_size, batch_decoded_size);
+  EXPECT_EQ(v, batch_result);
+
   TestTruncate<InternalType, WidenInternalType, PARQUET_TYPE>(
       v, expected_byte_size);
 }
 
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestType(const InternalType& v, int expected_byte_size) {
+  return TestTypeWidening<InternalType, InternalType, PARQUET_TYPE>(
+      v, expected_byte_size);
+}
+
 TEST(PlainEncoding, Basic) {
   int8_t i8 = 12;
   int16_t i16 = 123;
@@ -286,6 +271,174 @@ TEST(PlainEncoding, Basic) {
   }
 }
 
+template <typename InputType, typename OutputType>
+void ExpectEqualWithStride(const std::vector<InputType>& input,
+    const std::vector<uint8_t>& output, int stride) {
+  ASSERT_EQ(input.size() * stride, output.size());
+
+  for (int i = 0; i < input.size(); i++) {
+    const InputType& input_value = input[i];
+    OutputType output_value;
+
+    memcpy(&output_value, &output[i * stride], sizeof(OutputType));
+    EXPECT_EQ(input_value, output_value);
+  }
+}
+
+/// This function can be used for type widening tests but also tests without type
+/// widening, in which case `WidenInternalType` is the same as `InternalType`.
+template <typename InternalType, typename WidenInternalType,
+         parquet::Type::type PARQUET_TYPE>
+void TestTypeWideningBatch(const std::vector<InternalType>& values,
+    int expected_byte_size, int stride) {
+  ASSERT_GE(stride, sizeof(WidenInternalType));
+
+  constexpr bool var_length = PARQUET_TYPE == parquet::Type::BYTE_ARRAY;
+
+  std::vector<uint8_t> buffer(values.size() * expected_byte_size, 0);
+  uint8_t* output_pos = buffer.data();
+  for (int i = 0; i < values.size(); i++) {
+    int encoded_size = Encode(values[i], expected_byte_size, output_pos, PARQUET_TYPE);
+    if (var_length) {
+      /// For variable length types, the size is variable and `expected_byte_size` should
+      /// be the maximum.
+      EXPECT_GE(expected_byte_size, encoded_size);
+    } else {
+      EXPECT_EQ(expected_byte_size, encoded_size);
+    }
+
+    output_pos += encoded_size;
+  }
+
+  /// Decode one by one.
+  std::vector<uint8_t> output_1by1(values.size() * stride);
+  uint8_t* input_pos = buffer.data();
+  for (int i = 0; i < values.size(); i++) {
+    WidenInternalType* dest = reinterpret_cast<WidenInternalType*>(
+        &output_1by1[i * stride]);
+    int decoded_size = ParquetPlainEncoder::Decode<WidenInternalType, PARQUET_TYPE>(
+        input_pos, buffer.data() + buffer.size(), expected_byte_size, dest);
+    if (var_length) {
+      EXPECT_GE(expected_byte_size, decoded_size);
+    } else {
+      EXPECT_EQ(expected_byte_size, decoded_size);
+    }
+
+    input_pos += decoded_size;
+  }
+
+  ExpectEqualWithStride<InternalType, WidenInternalType>(values, output_1by1, stride);
+
+  /// Decode in batch.
+  std::vector<uint8_t> output_batch(values.size() * stride);
+  int decoded_size = ParquetPlainEncoder::DecodeBatch<WidenInternalType, PARQUET_TYPE>(
+      buffer.data(), buffer.data() + buffer.size(), expected_byte_size, values.size(),
+      stride, reinterpret_cast<WidenInternalType*>(output_batch.data()));
+  if (var_length) {
+    EXPECT_GE(buffer.size(), decoded_size);
+  } else {
+    EXPECT_EQ(buffer.size(), decoded_size);
+  }
+
+  ExpectEqualWithStride<InternalType, WidenInternalType>(values, output_batch, stride);
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+void TestTypeBatch(const std::vector<InternalType>& values, int expected_byte_size,
+    int stride) {
+  return TestTypeWideningBatch<InternalType, InternalType, PARQUET_TYPE>(values,
+      expected_byte_size, stride);
+}
+
+TEST(PlainEncoding, Batch) {
+  std::mt19937 gen;
+  RandTestUtil::SeedRng("PARQUET_PLAIN_ENCODING_TEST_RANDOM_SEED", &gen);
+
+  constexpr int NUM_ELEMENTS = 1024 * 5 + 10;
+  constexpr int stride = 20;
+
+  const std::vector<int8_t> int8_vec = RandomNumberVec<int8_t>(gen, NUM_ELEMENTS);
+  TestTypeBatch<int8_t, parquet::Type::INT32>(int8_vec, sizeof(int32_t), stride);
+
+  const std::vector<int16_t> int16_vec = RandomNumberVec<int16_t>(gen, NUM_ELEMENTS);
+  TestTypeBatch<int16_t, parquet::Type::INT32>(int16_vec, sizeof(int32_t), stride);
+
+  const std::vector<int32_t> int32_vec = RandomNumberVec<int32_t>(gen, NUM_ELEMENTS);
+  TestTypeBatch<int32_t, parquet::Type::INT32>(int32_vec, sizeof(int32_t), stride);
+
+  const std::vector<int64_t> int64_vec = RandomNumberVec<int64_t>(gen, NUM_ELEMENTS);
+  TestTypeBatch<int64_t, parquet::Type::INT64>(int64_vec, sizeof(int64_t), stride);
+
+  const std::vector<float> float_vec = RandomNumberVec<float>(gen, NUM_ELEMENTS);
+  TestTypeBatch<float, parquet::Type::FLOAT>(float_vec, sizeof(float), stride);
+
+  const std::vector<double> double_vec = RandomNumberVec<double>(gen, NUM_ELEMENTS);
+  TestTypeBatch<double, parquet::Type::DOUBLE>(double_vec, sizeof(double), stride);
+
+  constexpr int max_str_length = 100;
+  const std::vector<std::string> str_vec = RandomStrVec(gen, NUM_ELEMENTS,
+      max_str_length);
+  std::vector<StringValue> sv_vec(str_vec.size());
+  std::transform(str_vec.begin(), str_vec.end(), sv_vec.begin(),
+      [] (const std::string& s) { return StringValue(s); });
+  TestTypeBatch<StringValue, parquet::Type::BYTE_ARRAY>(sv_vec,
+      sizeof(int32_t) + max_str_length, stride);
+
+  const std::vector<TimestampValue> tv_vec = RandomTimestampVec(gen, NUM_ELEMENTS);
+  TestTypeBatch<TimestampValue, parquet::Type::INT96>(tv_vec, 12, stride);
+
+  // Test type widening.
+  TestTypeWideningBatch<int32_t, int64_t, parquet::Type::INT32>(int32_vec,
+      sizeof(int32_t), stride);
+  TestTypeWideningBatch<int32_t, double, parquet::Type::INT32>(int32_vec, sizeof(int32_t),
+      stride);
+  TestTypeWideningBatch<float, double, parquet::Type::FLOAT>(float_vec, sizeof(float),
+      stride);
+
+  // In the Decimal batch tests, when writing the decimals as BYTE_ARRAYs, we always use
+  // the size of the underlying type as the array length for simplicity.
+  // The non-batch tests take care of storing them on as many bytes as needed.
+
+  // BYTE_ARRAYs store the length of the array on 4 bytes.
+  constexpr int decimal_size_bytes = sizeof(int32_t);
+
+  // Decimal4Value
+  std::vector<Decimal4Value> decimal4_vec(int32_vec.size());
+  std::transform(int32_vec.begin(), int32_vec.end(), decimal4_vec.begin(),
+      [] (const int32_t i) { return Decimal4Value(i); });
+
+  TestTypeBatch<Decimal4Value, parquet::Type::BYTE_ARRAY>(decimal4_vec,
+      decimal_size_bytes + sizeof(Decimal4Value::StorageType), stride);
+  TestTypeBatch<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(decimal4_vec,
+      sizeof(Decimal4Value::StorageType), stride);
+  TestTypeBatch<Decimal4Value, parquet::Type::INT32>(decimal4_vec,
+      sizeof(Decimal4Value::StorageType), stride);
+
+  // Decimal8Value
+  std::vector<Decimal8Value> decimal8_vec(int64_vec.size());
+  std::transform(int64_vec.begin(), int64_vec.end(), decimal8_vec.begin(),
+      [] (const int64_t i) { return Decimal8Value(i); });
+
+  TestTypeBatch<Decimal8Value, parquet::Type::BYTE_ARRAY>(decimal8_vec,
+      decimal_size_bytes + sizeof(Decimal8Value::StorageType), stride);
+  TestTypeBatch<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(decimal8_vec,
+      sizeof(Decimal8Value::StorageType), stride);
+  TestTypeBatch<Decimal8Value, parquet::Type::INT64>(decimal8_vec,
+      sizeof(Decimal8Value::StorageType), stride);
+
+  // Decimal16Value
+  // We do not test the whole 16 byte range as generating random int128_t values is
+  // complicated.
+  std::vector<Decimal16Value> decimal16_vec(int64_vec.size());
+  std::transform(int64_vec.begin(), int64_vec.end(), decimal16_vec.begin(),
+      [] (const int64_t i) { return Decimal16Value(i); });
+
+  TestTypeBatch<Decimal16Value, parquet::Type::BYTE_ARRAY>(decimal16_vec,
+      decimal_size_bytes + sizeof(Decimal16Value::StorageType), stride);
+  TestTypeBatch<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(decimal16_vec,
+      sizeof(Decimal16Value::StorageType), stride);
+}
+
 TEST(PlainEncoding, DecimalBigEndian) {
   // Test Basic can pass if we make the same error in encode and decode.
   // Verify the bytes are actually big endian.
diff --git a/be/src/testutil/random-vector-generators.h b/be/src/testutil/random-vector-generators.h
new file mode 100644
index 0000000..b26820e
--- /dev/null
+++ b/be/src/testutil/random-vector-generators.h
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <algorithm>
+#include <limits.h>
+#include <random>
+#include <type_traits>
+
+namespace impala {
+
+/// Generates a vector of numbers with the given length, consisting of random elements.
+/// `NUM_T` must either be an integral or a floating point type.
+/// The distribution is uniform across the range [min, max] (both inclusive).
+template <typename NUM_T, typename Generator = std::mt19937>
+std::vector<NUM_T> RandomNumberVecMinMax(Generator& gen, const int length,
+    const NUM_T min, const NUM_T max) {
+  static_assert(std::is_integral<NUM_T>::value || std::is_floating_point<NUM_T>::value,
+      "An integral or floating point type is needed.");
+
+  using Dist = std::conditional_t<std::is_integral<NUM_T>::value,
+    std::uniform_int_distribution<NUM_T>,
+    std::uniform_real_distribution<NUM_T>>;
+
+  Dist dist(min, max);
+
+  std::vector<NUM_T> vec(length);
+  std::generate(vec.begin(), vec.end(), [&gen, &dist] () {
+      return dist(gen);
+      });
+
+  return vec;
+}
+
+/// Generates a vector of numbers with the given length, consisting of random elements.
+/// `NUM_T` must either be an integral or a floating point type.
+/// The distribution is uniform across all values of the NUM_T.
+template <typename NUM_T, typename Generator = std::mt19937>
+std::vector<NUM_T> RandomNumberVec(Generator& gen, const int length) {
+  return RandomNumberVecMinMax<NUM_T, Generator>(gen, length,
+      std::numeric_limits<NUM_T>::min(), std::numeric_limits<NUM_T>::max());
+}
+
+/// Generates a vector of strings with the given length. The elements are randomly
+/// generated strings. The length of the strings is a random number between
+/// `min_str_length` and `max_str_length` (both inclusive).
+template <typename Generator = std::mt19937>
+std::vector<std::string> RandomStrVec(Generator& gen, const int length,
+    const int max_str_length, const int min_str_length = 0) {
+  std::uniform_int_distribution<int> length_dist(0, max_str_length);
+  std::uniform_int_distribution<char> letter_dist('a', 'z');
+
+  std::vector<std::string> vec(length);
+  std::generate(vec.begin(), vec.end(), [&] () {
+      int str_length = length_dist(gen);
+      std::string s;
+
+      for (int i = 0; i < str_length; i++) {
+        s.push_back(letter_dist(gen));
+      }
+
+      return s;
+      });
+
+  return vec;
+}
+
+/// Generates a vector of `TimestampValue`s with the given length. The elements are random
+/// timestamps with uniform distribution on the valid range.
+template <typename Generator = std::mt19937>
+std::vector<TimestampValue> RandomTimestampVec(Generator& gen, const int length) {
+  const TimestampValue min_date =
+      TimestampValue::Parse("1400-01-01 00:00:00");
+  int64_t min_millis;
+  bool min_success = min_date.FloorUtcToUnixTimeMillis(&min_millis);
+  DCHECK(min_success);
+
+  const TimestampValue max_date =
+      TimestampValue::Parse("9999-12-31 23:59:59");
+  int64_t max_millis;
+  bool max_success = max_date.FloorUtcToUnixTimeMillis(&max_millis);
+  DCHECK(max_success);
+
+  const std::vector<int64_t> unix_time_millis = RandomNumberVecMinMax<int64_t, Generator>(
+      gen, length, min_millis, max_millis);
+  std::vector<TimestampValue> timestamps(length);
+  std::transform(unix_time_millis.begin(), unix_time_millis.end(), timestamps.begin(),
+     TimestampValue::UtcFromUnixTimeMillis);
+
+  return timestamps;
+}
+
+} // namespace impala