You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@orc.apache.org by "wgtmac (via GitHub)" <gi...@apache.org> on 2023/05/24 05:32:18 UTC

[GitHub] [orc] wgtmac commented on a diff in pull request #1500: ORC-1386: [C++] Support schema evolution from numeric to string group/decimal/timestamp

wgtmac commented on code in PR #1500:
URL: https://github.com/apache/orc/pull/1500#discussion_r1203418613


##########
c++/src/Timezone.cc:
##########
@@ -587,6 +587,12 @@ namespace orc {
       return clk + getVariant(clk).gmtOffset;
     }
 
+    int64_t convertFromUTC(int64_t clk) const override {

Review Comment:
   Could you add a test for this?



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    } else {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          } else {
+            strBuffer[i].resize(maxLength, ' ');
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    }
+    return size;
+  }
+  template <typename FileTypeBatch, typename ReadTypeBatch, bool isFileTypeDouble>
+  class NumericToDecimalColumnReader : public ConvertColumnReader {
+   public:
+    NumericToDecimalColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
+                                 bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {
+      precision = static_cast<int32_t>(readType.getPrecision());
+      scale = static_cast<int32_t>(readType.getScale());
+    }
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+      auto& dstBatch = *SafeCastBatchTo<ReadTypeBatch*>(&rowBatch);
+      dstBatch.precision = precision;
+      dstBatch.scale = scale;
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          if constexpr (isFileTypeDouble) {

Review Comment:
   ```suggestion
             if constexpr (isFloatingFileType) {
   ```



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -235,10 +514,64 @@ namespace orc {
   DEFINE_NUMERIC_CONVERT_READER(Int, Double, double)
   DEFINE_NUMERIC_CONVERT_READER(Long, Double, double)
 
+  // Numeric to String/Char
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Byte, String)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Short, String)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Int, String)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Long, String)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Float, String)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Double, String)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Byte, Char)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Short, Char)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Int, Char)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Long, Char)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Float, Char)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Double, Char)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Byte, Varchar)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Short, Varchar)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Int, Varchar)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Long, Varchar)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Float, Varchar)
+  DEFINE_NUMERIC_CONVERT_TO_STRING_VARINT_READER(Double, Varchar)
+  using BooleanToStringColumnReader = BooleanToStringVariantColumnReader;
+  using BooleanToCharColumnReader = BooleanToStringVariantColumnReader;
+  using BooleanToVarcharColumnReader = BooleanToStringVariantColumnReader;
+
+  // Numeric to Decimal
+  DEFINE_NUMERIC_CONVERT_TO_DECIMAL_READER(Boolean, false)
+  DEFINE_NUMERIC_CONVERT_TO_DECIMAL_READER(Byte, false)
+  DEFINE_NUMERIC_CONVERT_TO_DECIMAL_READER(Short, false)
+  DEFINE_NUMERIC_CONVERT_TO_DECIMAL_READER(Int, false)
+  DEFINE_NUMERIC_CONVERT_TO_DECIMAL_READER(Long, false)
+  DEFINE_NUMERIC_CONVERT_TO_DECIMAL_READER(Float, true)
+  DEFINE_NUMERIC_CONVERT_TO_DECIMAL_READER(Double, true)
+
+  // Numeric to Timestamp
+  DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Boolean)
+  DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Byte)
+  DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Short)
+  DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Int)
+  DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Long)
+  DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Float)
+  DEFINE_NUMERIC_CONVERT_TO_TIMESTAMP_READER(Double)
+
 #define CASE_CREATE_READER(TYPE, CONVERT) \
   case TYPE:                              \
     return std::make_unique<CONVERT##ColumnReader>(_readType, fileType, stripe, throwOnOverflow);
 
+  const static int32_t MAX_PRECISION_64 = 18;
+
+#define CASE_CREATE_DECIMAL_READER(FROM)                                                   \
+  case DECIMAL: {                                                                          \
+    if (_readType.getPrecision() <= MAX_PRECISION_64) {                                    \

Review Comment:
   ```suggestion
       if (_readType.getPrecision() > 0 && _readType.getPrecision() <= MAX_PRECISION_64) {                                    \
   ```



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));

Review Comment:
   I am not sure if it should throw if truncation happens.



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";

Review Comment:
   Does it follow the Java implementation?



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,

Review Comment:
   ditto



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    } else {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          } else {
+            strBuffer[i].resize(maxLength, ' ');
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    }
+    return size;
+  }
+  template <typename FileTypeBatch, typename ReadTypeBatch, bool isFileTypeDouble>
+  class NumericToDecimalColumnReader : public ConvertColumnReader {
+   public:
+    NumericToDecimalColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
+                                 bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {
+      precision = static_cast<int32_t>(readType.getPrecision());
+      scale = static_cast<int32_t>(readType.getScale());
+    }
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+      auto& dstBatch = *SafeCastBatchTo<ReadTypeBatch*>(&rowBatch);
+      dstBatch.precision = precision;
+      dstBatch.scale = scale;
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          if constexpr (isFileTypeDouble) {
+            convertDoubleToDecimal(dstBatch, i, srcBatch.data[i]);
+          } else {
+            convertIntegerToDecimal(dstBatch, i, srcBatch.data[i]);
+          }
+        }
+      }
+    }
+
+   private:
+    template <typename srcType>
+    void convertDoubleToDecimal(ReadTypeBatch& dstBatch, uint64_t idx, srcType value) {
+      std::string strValue = std::to_string(value);
+      int32_t fromScale = 0;
+      int32_t fromPrecision = static_cast<int32_t>(strValue.length());

Review Comment:
   What if value is a negative number?



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,

Review Comment:
   `_readType` and `_throwOnOverflow` do not need to have `_` prefix, right?



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    } else {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          } else {
+            strBuffer[i].resize(maxLength, ' ');
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    }
+    return size;
+  }
+  template <typename FileTypeBatch, typename ReadTypeBatch, bool isFileTypeDouble>
+  class NumericToDecimalColumnReader : public ConvertColumnReader {
+   public:
+    NumericToDecimalColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
+                                 bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {
+      precision = static_cast<int32_t>(readType.getPrecision());
+      scale = static_cast<int32_t>(readType.getScale());
+    }
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+      auto& dstBatch = *SafeCastBatchTo<ReadTypeBatch*>(&rowBatch);
+      dstBatch.precision = precision;
+      dstBatch.scale = scale;
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          if constexpr (isFileTypeDouble) {
+            convertDoubleToDecimal(dstBatch, i, srcBatch.data[i]);
+          } else {
+            convertIntegerToDecimal(dstBatch, i, srcBatch.data[i]);
+          }
+        }
+      }
+    }
+
+   private:
+    template <typename srcType>
+    void convertDoubleToDecimal(ReadTypeBatch& dstBatch, uint64_t idx, srcType value) {
+      std::string strValue = std::to_string(value);

Review Comment:
   String conversion would be slow. Can we simply convert it into an integer and reuse the integer code path?



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    } else {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          } else {
+            strBuffer[i].resize(maxLength, ' ');
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    }
+    return size;
+  }
+  template <typename FileTypeBatch, typename ReadTypeBatch, bool isFileTypeDouble>
+  class NumericToDecimalColumnReader : public ConvertColumnReader {
+   public:
+    NumericToDecimalColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
+                                 bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {
+      precision = static_cast<int32_t>(readType.getPrecision());
+      scale = static_cast<int32_t>(readType.getScale());
+    }
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+      auto& dstBatch = *SafeCastBatchTo<ReadTypeBatch*>(&rowBatch);
+      dstBatch.precision = precision;
+      dstBatch.scale = scale;
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          if constexpr (isFileTypeDouble) {
+            convertDoubleToDecimal(dstBatch, i, srcBatch.data[i]);
+          } else {
+            convertIntegerToDecimal(dstBatch, i, srcBatch.data[i]);
+          }
+        }
+      }
+    }
+
+   private:
+    template <typename srcType>

Review Comment:
   ```suggestion
       template <typename SrcType>
   ```



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);

Review Comment:
   Should we throw in this case? The result does not make sense at all.



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    } else {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          } else {
+            strBuffer[i].resize(maxLength, ' ');
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    }
+    return size;
+  }
+  template <typename FileTypeBatch, typename ReadTypeBatch, bool isFileTypeDouble>
+  class NumericToDecimalColumnReader : public ConvertColumnReader {
+   public:
+    NumericToDecimalColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
+                                 bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {
+      precision = static_cast<int32_t>(readType.getPrecision());
+      scale = static_cast<int32_t>(readType.getScale());
+    }
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+      auto& dstBatch = *SafeCastBatchTo<ReadTypeBatch*>(&rowBatch);
+      dstBatch.precision = precision;
+      dstBatch.scale = scale;
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          if constexpr (isFileTypeDouble) {
+            convertDoubleToDecimal(dstBatch, i, srcBatch.data[i]);
+          } else {
+            convertIntegerToDecimal(dstBatch, i, srcBatch.data[i]);
+          }
+        }
+      }
+    }
+
+   private:
+    template <typename srcType>
+    void convertDoubleToDecimal(ReadTypeBatch& dstBatch, uint64_t idx, srcType value) {
+      std::string strValue = std::to_string(value);
+      int32_t fromScale = 0;
+      int32_t fromPrecision = static_cast<int32_t>(strValue.length());
+      Int128 i128 = 0;
+      for (size_t i = 0; i < strValue.length(); ++i) {
+        auto c = strValue[i];
+        if (c == '.') {
+          fromScale = static_cast<int32_t>(strValue.length() - i - 1);
+          fromPrecision -= 1;
+          continue;
+        }
+        i128 *= 10;
+        i128 += c - '0';
+      }
+      auto result = convertDecimal(i128, fromPrecision, fromScale, precision, scale);
+      if (result.first) {
+        handleOverflow<srcType, decltype(dstBatch.values[idx])>(dstBatch, idx, throwOnOverflow);
+      } else {
+        if constexpr (std::is_same<ReadTypeBatch, Decimal64VectorBatch>::value) {
+          if (!result.second.fitsInLong()) {
+            handleOverflow<srcType, decltype(dstBatch.values[idx])>(dstBatch, idx, throwOnOverflow);
+          } else {
+            dstBatch.values[idx] = result.second.toLong();
+          }
+        } else {
+          dstBatch.values[idx] = result.second;
+        }
+      }
+    }
+
+    template <typename srcType>

Review Comment:
   ```suggestion
       template <typename SrcType>
   ```



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    } else {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          } else {
+            strBuffer[i].resize(maxLength, ' ');
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    }
+    return size;
+  }
+  template <typename FileTypeBatch, typename ReadTypeBatch, bool isFileTypeDouble>

Review Comment:
   add one blank line above.



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    } else {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);
+          } else {
+            strBuffer[i].resize(maxLength, ' ');
+          }
+          size += strBuffer[i].size();
+        }
+      }
+    }
+    return size;
+  }
+  template <typename FileTypeBatch, typename ReadTypeBatch, bool isFileTypeDouble>
+  class NumericToDecimalColumnReader : public ConvertColumnReader {
+   public:
+    NumericToDecimalColumnReader(const Type& _readType, const Type& fileType, StripeStreams& stripe,
+                                 bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {
+      precision = static_cast<int32_t>(readType.getPrecision());
+      scale = static_cast<int32_t>(readType.getScale());
+    }
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+      auto& dstBatch = *SafeCastBatchTo<ReadTypeBatch*>(&rowBatch);
+      dstBatch.precision = precision;
+      dstBatch.scale = scale;
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          if constexpr (isFileTypeDouble) {
+            convertDoubleToDecimal(dstBatch, i, srcBatch.data[i]);
+          } else {
+            convertIntegerToDecimal(dstBatch, i, srcBatch.data[i]);
+          }
+        }
+      }
+    }
+
+   private:
+    template <typename srcType>
+    void convertDoubleToDecimal(ReadTypeBatch& dstBatch, uint64_t idx, srcType value) {
+      std::string strValue = std::to_string(value);
+      int32_t fromScale = 0;
+      int32_t fromPrecision = static_cast<int32_t>(strValue.length());
+      Int128 i128 = 0;
+      for (size_t i = 0; i < strValue.length(); ++i) {
+        auto c = strValue[i];
+        if (c == '.') {
+          fromScale = static_cast<int32_t>(strValue.length() - i - 1);
+          fromPrecision -= 1;
+          continue;
+        }
+        i128 *= 10;
+        i128 += c - '0';
+      }
+      auto result = convertDecimal(i128, fromPrecision, fromScale, precision, scale);
+      if (result.first) {
+        handleOverflow<srcType, decltype(dstBatch.values[idx])>(dstBatch, idx, throwOnOverflow);
+      } else {
+        if constexpr (std::is_same<ReadTypeBatch, Decimal64VectorBatch>::value) {
+          if (!result.second.fitsInLong()) {
+            handleOverflow<srcType, decltype(dstBatch.values[idx])>(dstBatch, idx, throwOnOverflow);
+          } else {
+            dstBatch.values[idx] = result.second.toLong();
+          }
+        } else {
+          dstBatch.values[idx] = result.second;
+        }
+      }
+    }
+
+    template <typename srcType>
+    void convertIntegerToDecimal(ReadTypeBatch& dstBatch, uint64_t idx, srcType value) {
+      int fromScale = 0;
+      int fromPrecision = 1;
+      for (srcType tmp = value; tmp /= 10; ++fromPrecision)
+        ;
+      auto result = convertDecimal(value, fromPrecision, fromScale, precision, scale);
+      if (result.first) {
+        handleOverflow<srcType, decltype(dstBatch.values[idx])>(dstBatch, idx, throwOnOverflow);
+      } else {
+        if constexpr (std::is_same<ReadTypeBatch, Decimal64VectorBatch>::value) {
+          if (!result.second.fitsInLong()) {
+            handleOverflow<srcType, decltype(dstBatch.values[idx])>(dstBatch, idx, throwOnOverflow);
+          } else {
+            dstBatch.values[idx] = result.second.toLong();
+          }
+        } else {
+          dstBatch.values[idx] = result.second;
+        }
+      }
+    }
+
+    int32_t precision;
+    int32_t scale;
+  };
+
+  class ConvertToTimestampColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToTimestampColumnReader(const Type& _readType, const Type& fileType,
+                                   StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow),
+          readerTimezone(readType.getKind() == TIMESTAMP_INSTANT ? getTimezoneByName("GMT")
+                                                                 : stripe.getReaderTimezone()),
+          needConvertTimezone(&readerTimezone != &getTimezoneByName("GMT")) {}
+
+   protected:
+    const orc::Timezone& readerTimezone;
+    const bool needConvertTimezone;
+  };
+
+  template <typename FileTypeBatch>
+  class NumericToTimestampColumnReader : public ConvertToTimestampColumnReader {
+   public:
+    NumericToTimestampColumnReader(const Type& _readType, const Type& fileType,
+                                   StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToTimestampColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertToTimestampColumnReader::next(rowBatch, numValues, notNull);
+
+      const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+      auto& dstBatch = *SafeCastBatchTo<TimestampVectorBatch*>(&rowBatch);
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          convertToTimestamp(dstBatch, i, srcBatch.data[i]);
+        }
+      }
+    }
+
+   private:
+    template <typename FileType>
+    void convertToTimestamp(TimestampVectorBatch& dstBatch, uint64_t idx, FileType value);
+  };
+
+  template <typename FileTypeBatch>
+  template <typename FileType>
+  void NumericToTimestampColumnReader<FileTypeBatch>::convertToTimestamp(
+      TimestampVectorBatch& dstBatch, uint64_t idx, FileType value) {
+    if constexpr (std::is_floating_point<FileType>::value) {
+      if (value > static_cast<FileType>(std::numeric_limits<int64_t>::max()) ||
+          value < static_cast<FileType>(std::numeric_limits<int64_t>::min())) {
+        handleOverflow<FileType, int64_t>(dstBatch, idx, throwOnOverflow);
+        return;
+      }
+      dstBatch.data[idx] = static_cast<int64_t>(value);
+      dstBatch.nanoseconds[idx] = static_cast<int32_t>(
+          static_cast<double>(value - static_cast<FileType>(dstBatch.data[idx])) * 1e9);
+      if (dstBatch.nanoseconds[idx] < 0) {
+        dstBatch.data[idx] -= 1;
+        dstBatch.nanoseconds[idx] += static_cast<int32_t>(1e9);
+      }
+    } else {
+      dstBatch.data[idx] = value;
+      dstBatch.nanoseconds[idx] = 0;
+    }
+    if (needConvertTimezone) {
+      dstBatch.data[idx] = readerTimezone.convertFromUTC(dstBatch.data[idx]);

Review Comment:
   Does this follow the Java impl as well?



##########
c++/src/ConvertColumnReader.cc:
##########
@@ -186,10 +186,289 @@ namespace orc {
     }
   };
 
+  class ConvertToStringVariantColumnReader : public ConvertColumnReader {
+   public:
+    ConvertToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ConvertColumnReader::next(rowBatch, numValues, notNull);
+
+      // cache converted string in the buffer
+      auto totalLength = convertToStrBuffer(rowBatch, numValues);
+
+      // contact string values to blob buffer of vector batch
+      auto& dstBatch = *SafeCastBatchTo<StringVectorBatch*>(&rowBatch);
+      dstBatch.blob.resize(totalLength);
+      char* blob = dstBatch.blob.data();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          const auto size = strBuffer[i].size();
+          ::memcpy(blob, strBuffer[i].c_str(), size);
+          dstBatch.data[i] = blob;
+          dstBatch.length[i] = static_cast<int32_t>(size);
+          blob += size;
+        }
+      }
+      strBuffer.clear();
+    }
+
+    virtual size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) = 0;
+
+   protected:
+    std::vector<std::string> strBuffer;
+  };
+
+  class BooleanToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    BooleanToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  size_t BooleanToStringVariantColumnReader::convertToStrBuffer(ColumnVectorBatch& rowBatch,
+                                                                uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const BooleanVectorBatch*>(data.get());
+    std::string trueValue = "TRUE";
+    std::string falseValue = "FALSE";
+    if (readType.getKind() == CHAR) {
+      trueValue.resize(readType.getMaximumLength(), ' ');
+      falseValue.resize(readType.getMaximumLength(), ' ');
+    } else if (readType.getKind() == VARCHAR) {
+      trueValue = trueValue.substr(0, std::min(static_cast<uint64_t>(4), readType.getMaximumLength()));
+      falseValue = falseValue.substr(0, std::min(static_cast<uint64_t>(5), readType.getMaximumLength()));
+    }
+    // cast the bool value to string and truncate to the max length
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+        strBuffer[i] = (srcBatch.data[i] ? trueValue : falseValue);
+        size += strBuffer[i].size();
+      }
+    }
+    return size;
+  }
+
+  template <typename FileTypeBatch>
+  class NumericToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
+   public:
+    NumericToStringVariantColumnReader(const Type& _readType, const Type& fileType,
+                                       StripeStreams& stripe, bool _throwOnOverflow)
+        : ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow) {}
+    size_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override;
+  };
+
+  template <typename FileTypeBatch>
+  size_t NumericToStringVariantColumnReader<FileTypeBatch>::convertToStrBuffer(
+      ColumnVectorBatch& rowBatch, uint64_t numValues) {
+    size_t size = 0;
+    strBuffer.resize(numValues);
+    const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
+    if (readType.getKind() == STRING) {
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          size += strBuffer[i].size();
+        }
+      }
+    } else if (readType.getKind() == VARCHAR) {
+      const auto maxLength = readType.getMaximumLength();
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
+          strBuffer[i] = std::to_string(srcBatch.data[i]);
+          if (strBuffer[i].size() > maxLength) {
+            strBuffer[i].resize(maxLength);

Review Comment:
   We probably need to throw or nullify if the result needs to be truncated. Otherwise the converted string does not keep original value which is not meaningful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@orc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org