You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/11/11 09:13:40 UTC

[orc] branch main updated: ORC-1298: [C++] Support dedicated ColumnVectorBatch of numeric types

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 597477e2c ORC-1298: [C++] Support dedicated ColumnVectorBatch of numeric types
597477e2c is described below

commit 597477e2cd594d2283173924ad502c6640c3b143
Author: zou Hao <34...@users.noreply.github.com>
AuthorDate: Fri Nov 11 17:13:34 2022 +0800

    ORC-1298: [C++] Support dedicated ColumnVectorBatch of numeric types
    
    This closes #1307
---
 c++/include/orc/MemoryPool.hh |  36 ++++++
 c++/include/orc/Reader.hh     |  12 ++
 c++/include/orc/Type.hh       |   4 +
 c++/include/orc/Vector.hh     | 122 +++++++++++++++---
 c++/include/orc/Writer.hh     |  11 ++
 c++/src/ColumnReader.cc       | 285 +++++++++++++++++++++++-------------------
 c++/src/ColumnReader.hh       |   3 +-
 c++/src/ColumnWriter.cc       | 148 ++++++++++++++--------
 c++/src/MemoryPool.cc         |  76 +++++++++++
 c++/src/Options.hh            |  11 ++
 c++/src/RLE.cc                |  17 ++-
 c++/src/RLE.hh                |  10 ++
 c++/src/RLEv1.cc              |  25 ++--
 c++/src/RLEv1.hh              |   7 ++
 c++/src/RLEv2.hh              |  25 ++--
 c++/src/Reader.cc             |   6 +-
 c++/src/Reader.hh             |   1 +
 c++/src/RleDecoderV2.cc       |  42 +++++--
 c++/src/Timezone.cc           |   1 +
 c++/src/TypeImpl.cc           |  44 +++++--
 c++/src/TypeImpl.hh           |   4 +
 c++/src/Vector.cc             |  62 ---------
 c++/src/Writer.cc             |  16 ++-
 c++/test/TestWriter.cc        | 101 ++++++++++++++-
 24 files changed, 767 insertions(+), 302 deletions(-)

diff --git a/c++/include/orc/MemoryPool.hh b/c++/include/orc/MemoryPool.hh
index 347505412..f0dbeb16d 100644
--- a/c++/include/orc/MemoryPool.hh
+++ b/c++/include/orc/MemoryPool.hh
@@ -108,6 +108,14 @@ namespace orc {
   template <>
   void DataBuffer<double>::resize(uint64_t newSize);
 
+  // Specializations for float
+
+  template <>
+  DataBuffer<float>::~DataBuffer();
+
+  template <>
+  void DataBuffer<float>::resize(uint64_t newSize);
+
   // Specializations for int64_t
 
   template <>
@@ -116,6 +124,30 @@ namespace orc {
   template <>
   void DataBuffer<int64_t>::resize(uint64_t newSize);
 
+  // Specializations for int32_t
+
+  template <>
+  DataBuffer<int32_t>::~DataBuffer();
+
+  template <>
+  void DataBuffer<int32_t>::resize(uint64_t newSize);
+
+  // Specializations for int16_t
+
+  template <>
+  DataBuffer<int16_t>::~DataBuffer();
+
+  template <>
+  void DataBuffer<int16_t>::resize(uint64_t newSize);
+
+  // Specializations for int8_t
+
+  template <>
+  DataBuffer<int8_t>::~DataBuffer();
+
+  template <>
+  void DataBuffer<int8_t>::resize(uint64_t newSize);
+
   // Specializations for uint64_t
 
   template <>
@@ -140,8 +172,12 @@ namespace orc {
   extern template class DataBuffer<char>;
   extern template class DataBuffer<char*>;
   extern template class DataBuffer<double>;
+  extern template class DataBuffer<float>;
   extern template class DataBuffer<Int128>;
   extern template class DataBuffer<int64_t>;
+  extern template class DataBuffer<int32_t>;
+  extern template class DataBuffer<int16_t>;
+  extern template class DataBuffer<int8_t>;
   extern template class DataBuffer<uint64_t>;
   extern template class DataBuffer<unsigned char>;
 
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 98e4e8b3a..a1ac53af8 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -324,6 +324,18 @@ namespace orc {
      * Get the IdReadIntentMap map that was supplied by client.
      */
     const IdReadIntentMap getIdReadIntentMap() const;
+
+    /**
+     * Set whether use fixed width numeric vectorBatch or not, such as int32_t / int16_t / int8_t /
+     * float vectorBatch.
+     */
+    RowReaderOptions& setUseTightNumericVector(bool useTightNumericVector);
+
+    /**
+     * Get whether or not to use fixed width numeric columnVectorBatch.
+     * @return if not set, the default is false
+     */
+    bool getUseTightNumericVector() const;
   };
 
   class RowReader;
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index e3c9768c1..e21852b61 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -72,6 +72,10 @@ namespace orc {
     virtual ORC_UNIQUE_PTR<ColumnVectorBatch> createRowBatch(uint64_t size, MemoryPool& pool,
                                                              bool encoded = false) const = 0;
 
+    virtual ORC_UNIQUE_PTR<ColumnVectorBatch> createRowBatch(
+        uint64_t size, MemoryPool& pool, bool encoded = false,
+        bool useTightNumericVector = false) const = 0;
+
     /**
      * Add a new field to a struct type.
      * @param fieldName the name of the new field
diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index 27afaa71f..4ed9a7e78 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -25,9 +25,9 @@
 
 #include <cstdlib>
 #include <cstring>
-#include <iostream>
 #include <list>
 #include <memory>
+#include <sstream>
 #include <stdexcept>
 #include <vector>
 
@@ -88,28 +88,116 @@ namespace orc {
     ColumnVectorBatch& operator=(const ColumnVectorBatch&);
   };
 
-  struct LongVectorBatch : public ColumnVectorBatch {
-    LongVectorBatch(uint64_t capacity, MemoryPool& pool);
-    virtual ~LongVectorBatch();
+  template <typename ValueType>
+  struct IntegerVectorBatch : public ColumnVectorBatch {
+    IntegerVectorBatch(uint64_t cap, MemoryPool& pool)
+        : ColumnVectorBatch(cap, pool), data(pool, cap) {
+      // PASS
+    }
 
-    DataBuffer<int64_t> data;
-    std::string toString() const;
-    void resize(uint64_t capacity);
-    void clear();
-    uint64_t getMemoryUsage();
+    virtual ~IntegerVectorBatch() = default;
+
+    inline std::string toString() const;
+
+    void resize(uint64_t cap) {
+      if (capacity < cap) {
+        ColumnVectorBatch::resize(cap);
+        data.resize(cap);
+      }
+    }
+
+    void clear() {
+      numElements = 0;
+    }
+
+    uint64_t getMemoryUsage() {
+      return ColumnVectorBatch::getMemoryUsage() +
+             static_cast<uint64_t>(data.capacity() * sizeof(ValueType));
+    }
+
+    DataBuffer<ValueType> data;
   };
 
-  struct DoubleVectorBatch : public ColumnVectorBatch {
-    DoubleVectorBatch(uint64_t capacity, MemoryPool& pool);
-    virtual ~DoubleVectorBatch();
-    std::string toString() const;
-    void resize(uint64_t capacity);
-    void clear();
-    uint64_t getMemoryUsage();
+  using LongVectorBatch = IntegerVectorBatch<int64_t>;
+  using IntVectorBatch = IntegerVectorBatch<int32_t>;
+  using ShortVectorBatch = IntegerVectorBatch<int16_t>;
+  using ByteVectorBatch = IntegerVectorBatch<int8_t>;
+
+  template <>
+  inline std::string LongVectorBatch::toString() const {
+    std::ostringstream buffer;
+    buffer << "Long vector <" << numElements << " of " << capacity << ">";
+    return buffer.str();
+  }
+
+  template <>
+  inline std::string IntVectorBatch::toString() const {
+    std::ostringstream buffer;
+    buffer << "Int vector <" << numElements << " of " << capacity << ">";
+    return buffer.str();
+  }
+
+  template <>
+  inline std::string ShortVectorBatch::toString() const {
+    std::ostringstream buffer;
+    buffer << "Short vector <" << numElements << " of " << capacity << ">";
+    return buffer.str();
+  }
+
+  template <>
+  inline std::string ByteVectorBatch::toString() const {
+    std::ostringstream buffer;
+    buffer << "Byte vector <" << numElements << " of " << capacity << ">";
+    return buffer.str();
+  }
+
+  template <typename FloatType>
+  struct FloatingVectorBatch : public ColumnVectorBatch {
+    FloatingVectorBatch(uint64_t cap, MemoryPool& pool)
+        : ColumnVectorBatch(cap, pool), data(pool, cap) {
+      // PASS
+    }
+
+    virtual ~FloatingVectorBatch() = default;
 
-    DataBuffer<double> data;
+    inline std::string toString() const;
+
+    void resize(uint64_t cap) {
+      if (capacity < cap) {
+        ColumnVectorBatch::resize(cap);
+        data.resize(cap);
+      }
+    }
+
+    void clear() {
+      numElements = 0;
+    }
+
+    uint64_t getMemoryUsage() {
+      return ColumnVectorBatch::getMemoryUsage() +
+             static_cast<uint64_t>(data.capacity() * sizeof(FloatType));
+    }
+
+    DataBuffer<FloatType> data;
   };
 
+  using DoubleVectorBatch = FloatingVectorBatch<double>;
+  using FloatVectorBatch = FloatingVectorBatch<float>;
+
+  template <>
+  inline std::string DoubleVectorBatch::toString() const {
+    std::ostringstream buffer;
+    buffer << "Double vector <" << numElements << " of " << capacity << ">";
+    return buffer.str();
+  }
+
+  template <>
+  inline std::string FloatVectorBatch::toString() const {
+    std::ostringstream buffer;
+    buffer << "Float vector <" << numElements << " of " << capacity << ">";
+    return buffer.str();
+  }
+
   struct StringVectorBatch : public ColumnVectorBatch {
     StringVectorBatch(uint64_t capacity, MemoryPool& pool);
     virtual ~StringVectorBatch();
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 854dc1dd7..032cad536 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -251,6 +251,17 @@ namespace orc {
      * @return if not set, return nullptr.
      */
     WriterMetrics* getWriterMetrics() const;
+
+    /**
+     * Set use tight numeric vectorBatch or not.
+     */
+    WriterOptions& setUseTightNumericVector(bool useTightNumericVector);
+
+    /**
+     * Get whether or not to use dedicated columnVectorBatch
+     * @return if not set, the default is false
+     */
+    bool getUseTightNumericVector() const;
   };
 
   class Writer {
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 5106162c6..eea978b0e 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -116,14 +116,18 @@ namespace orc {
   }
 
   /**
-   * Expand an array of bytes in place to the corresponding array of longs.
+   * Expand an array of bytes in place to the corresponding array of longs/.
    * Has to work backwards so that they data isn't clobbered during the
    * expansion.
    * @param buffer the array of chars and array of longs that need to be
    *        expanded
    * @param numValues the number of bytes to convert to longs
    */
-  void expandBytesToLongs(int64_t* buffer, uint64_t numValues) {
+  template <typename T>
+  void expandBytesToIntegers(T* buffer, uint64_t numValues) {
+    if (sizeof(T) == sizeof(char)) {
+      return;
+    }
     for (size_t i = numValues - 1; i < numValues; --i) {
       buffer[i] = reinterpret_cast<char*>(buffer)[i];
     }
@@ -169,7 +173,7 @@ namespace orc {
     int64_t* ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
     rle->next(reinterpret_cast<char*>(ptr), numValues,
               rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
-    expandBytesToLongs(ptr, numValues);
+    expandBytesToIntegers(ptr, numValues);
   }
 
   void BooleanColumnReader::seekToRowGroup(
@@ -178,99 +182,78 @@ namespace orc {
     rle->seek(positions.at(columnId));
   }
 
+  template <typename BatchType>
   class ByteColumnReader : public ColumnReader {
    private:
     std::unique_ptr<orc::ByteRleDecoder> rle;
 
    public:
-    ByteColumnReader(const Type& type, StripeStreams& stipe);
-    ~ByteColumnReader() override;
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
-
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
-  };
-
-  ByteColumnReader::ByteColumnReader(const Type& type, StripeStreams& stripe)
-      : ColumnReader(type, stripe) {
-    std::unique_ptr<SeekableInputStream> stream =
-        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
-    if (stream == nullptr) throw ParseError("DATA stream not found in Byte column");
-    rle = createByteRleDecoder(std::move(stream), metrics);
-  }
+    ByteColumnReader(const Type& type, StripeStreams& stripe) : ColumnReader(type, stripe) {
+      std::unique_ptr<SeekableInputStream> stream =
+          stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
+      if (stream == nullptr) throw ParseError("DATA stream not found in Byte column");
+      rle = createByteRleDecoder(std::move(stream), metrics);
+    }
 
-  ByteColumnReader::~ByteColumnReader() {
-    // PASS
-  }
+    ~ByteColumnReader() override = default;
 
-  uint64_t ByteColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    rle->skip(numValues);
-    return numValues;
-  }
+    uint64_t skip(uint64_t numValues) override {
+      numValues = ColumnReader::skip(numValues);
+      rle->skip(numValues);
+      return numValues;
+    }
 
-  void ByteColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    // Since the byte rle places the output in a char* instead of long*,
-    // we cheat here and use the long* and then expand it in a second pass.
-    int64_t* ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
-    rle->next(reinterpret_cast<char*>(ptr), numValues,
-              rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
-    expandBytesToLongs(ptr, numValues);
-  }
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ColumnReader::next(rowBatch, numValues, notNull);
+      // Since the byte rle places the output in a char* instead of long*,
+      // we cheat here and use the long* and then expand it in a second pass.
+      auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data();
+      rle->next(reinterpret_cast<char*>(ptr), numValues,
+                rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
+      expandBytesToIntegers(ptr, numValues);
+    }
 
-  void ByteColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
-    rle->seek(positions.at(columnId));
-  }
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override {
+      ColumnReader::seekToRowGroup(positions);
+      rle->seek(positions.at(columnId));
+    }
+  };
 
+  template <typename BatchType>
   class IntegerColumnReader : public ColumnReader {
    protected:
     std::unique_ptr<orc::RleDecoder> rle;
 
    public:
-    IntegerColumnReader(const Type& type, StripeStreams& stripe);
-    ~IntegerColumnReader() override;
-
-    uint64_t skip(uint64_t numValues) override;
-
-    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
-
-    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
-  };
-
-  IntegerColumnReader::IntegerColumnReader(const Type& type, StripeStreams& stripe)
-      : ColumnReader(type, stripe) {
-    RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
-    std::unique_ptr<SeekableInputStream> stream =
-        stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
-    if (stream == nullptr) throw ParseError("DATA stream not found in Integer column");
-    rle = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
-  }
+    IntegerColumnReader(const Type& type, StripeStreams& stripe) : ColumnReader(type, stripe) {
+      RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
+      std::unique_ptr<SeekableInputStream> stream =
+          stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
+      if (stream == nullptr) throw ParseError("DATA stream not found in Integer column");
+      rle = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
+    }
 
-  IntegerColumnReader::~IntegerColumnReader() {
-    // PASS
-  }
+    ~IntegerColumnReader() override {
+      // PASS
+    }
 
-  uint64_t IntegerColumnReader::skip(uint64_t numValues) {
-    numValues = ColumnReader::skip(numValues);
-    rle->skip(numValues);
-    return numValues;
-  }
+    uint64_t skip(uint64_t numValues) override {
+      numValues = ColumnReader::skip(numValues);
+      rle->skip(numValues);
+      return numValues;
+    }
 
-  void IntegerColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
-    ColumnReader::next(rowBatch, numValues, notNull);
-    rle->next(dynamic_cast<LongVectorBatch&>(rowBatch).data.data(), numValues,
-              rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
-  }
+    void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+      ColumnReader::next(rowBatch, numValues, notNull);
+      rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues,
+                rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
+    }
 
-  void IntegerColumnReader::seekToRowGroup(
-      std::unordered_map<uint64_t, PositionProvider>& positions) {
-    ColumnReader::seekToRowGroup(positions);
-    rle->seek(positions.at(columnId));
-  }
+    void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override {
+      ColumnReader::seekToRowGroup(positions);
+      rle->seek(positions.at(columnId));
+    }
+  };
 
   class TimestampColumnReader : public ColumnReader {
    private:
@@ -368,7 +351,7 @@ namespace orc {
     nanoRle->seek(positions.at(columnId));
   }
 
-  template <TypeKind columnKind, bool isLittleEndian>
+  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
   class DoubleColumnReader : public ColumnReader {
    public:
     DoubleColumnReader(const Type& type, StripeStreams& stripe);
@@ -397,7 +380,8 @@ namespace orc {
       return static_cast<unsigned char>(*(bufferPointer++));
     }
 
-    double readDouble() {
+    template <typename FloatType>
+    FloatType readDouble() {
       int64_t bits = 0;
       if (bufferEnd - bufferPointer >= 8) {
         if (isLittleEndian) {
@@ -418,11 +402,12 @@ namespace orc {
           bits |= static_cast<int64_t>(readByte()) << (i * 8);
         }
       }
-      double* result = reinterpret_cast<double*>(&bits);
+      FloatType* result = reinterpret_cast<FloatType*>(&bits);
       return *result;
     }
 
-    double readFloat() {
+    template <typename FloatType>
+    FloatType readFloat() {
       int32_t bits = 0;
       if (bufferEnd - bufferPointer >= 4) {
         if (isLittleEndian) {
@@ -440,20 +425,24 @@ namespace orc {
         }
       }
       float* result = reinterpret_cast<float*>(&bits);
-      return static_cast<double>(*result);
+      if (!result) {
+        std::cerr << "read float empty." << std::endl;
+      }
+      return static_cast<FloatType>(*result);
     }
   };
 
-  template <TypeKind columnKind, bool isLittleEndian>
-  DoubleColumnReader<columnKind, isLittleEndian>::DoubleColumnReader(const Type& type,
-                                                                     StripeStreams& stripe)
+  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
+  DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::DoubleColumnReader(
+      const Type& type, StripeStreams& stripe)
       : ColumnReader(type, stripe), bufferPointer(nullptr), bufferEnd(nullptr) {
     inputStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
     if (inputStream == nullptr) throw ParseError("DATA stream not found in Double column");
   }
 
-  template <TypeKind columnKind, bool isLittleEndian>
-  uint64_t DoubleColumnReader<columnKind, isLittleEndian>::skip(uint64_t numValues) {
+  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
+  uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::skip(
+      uint64_t numValues) {
     numValues = ColumnReader::skip(numValues);
 
     if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue * numValues) {
@@ -474,31 +463,32 @@ namespace orc {
     return numValues;
   }
 
-  template <TypeKind columnKind, bool isLittleEndian>
-  void DoubleColumnReader<columnKind, isLittleEndian>::next(ColumnVectorBatch& rowBatch,
-                                                            uint64_t numValues, char* notNull) {
+  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
+  void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::next(
+      ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
     ColumnReader::next(rowBatch, numValues, notNull);
     // update the notNull from the parent class
     notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
-    double* outArray = dynamic_cast<DoubleVectorBatch&>(rowBatch).data.data();
+    ValueType* outArray =
+        reinterpret_cast<ValueType*>(dynamic_cast<BatchType&>(rowBatch).data.data());
 
-    if (columnKind == FLOAT) {
+    if constexpr (columnKind == FLOAT) {
       if (notNull) {
         for (size_t i = 0; i < numValues; ++i) {
           if (notNull[i]) {
-            outArray[i] = readFloat();
+            outArray[i] = readFloat<ValueType>();
           }
         }
       } else {
         for (size_t i = 0; i < numValues; ++i) {
-          outArray[i] = readFloat();
+          outArray[i] = readFloat<ValueType>();
         }
       }
     } else {
       if (notNull) {
         for (size_t i = 0; i < numValues; ++i) {
           if (notNull[i]) {
-            outArray[i] = readDouble();
+            outArray[i] = readDouble<ValueType>();
           }
         }
       } else {
@@ -513,14 +503,14 @@ namespace orc {
           bufferPointer += bufferBytes;
         }
         for (size_t i = bufferNum; i < numValues; ++i) {
-          outArray[i] = readDouble();
+          outArray[i] = readDouble<ValueType>();
         }
       }
     }
   }
 
-  template <TypeKind columnKind, bool isLittleEndian>
-  void DoubleColumnReader<columnKind, isLittleEndian>::seekToRowGroup(
+  template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
+  void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::seekToRowGroup(
       std::unordered_map<uint64_t, PositionProvider>& positions) {
     ColumnReader::seekToRowGroup(positions);
     inputStream->seek(positions.at(columnId));
@@ -830,7 +820,7 @@ namespace orc {
     std::vector<std::unique_ptr<ColumnReader>> children;
 
    public:
-    StructColumnReader(const Type& type, StripeStreams& stipe);
+    StructColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false);
 
     uint64_t skip(uint64_t numValues) override;
 
@@ -845,7 +835,8 @@ namespace orc {
     void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
   };
 
-  StructColumnReader::StructColumnReader(const Type& type, StripeStreams& stripe)
+  StructColumnReader::StructColumnReader(const Type& type, StripeStreams& stripe,
+                                         bool useTightNumericVector)
       : ColumnReader(type, stripe) {
     // count the number of selected sub-columns
     const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
@@ -854,7 +845,7 @@ namespace orc {
         for (unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
           const Type& child = *type.getSubtype(i);
           if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
-            children.push_back(buildReader(child, stripe));
+            children.push_back(buildReader(child, stripe, useTightNumericVector));
           }
         }
         break;
@@ -914,7 +905,7 @@ namespace orc {
     std::unique_ptr<RleDecoder> rle;
 
    public:
-    ListColumnReader(const Type& type, StripeStreams& stipe);
+    ListColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false);
     ~ListColumnReader() override;
 
     uint64_t skip(uint64_t numValues) override;
@@ -930,7 +921,8 @@ namespace orc {
     void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
   };
 
-  ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe)
+  ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe,
+                                     bool useTightNumericVector)
       : ColumnReader(type, stripe) {
     // count the number of selected sub-columns
     const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
@@ -941,7 +933,7 @@ namespace orc {
     rle = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
     const Type& childType = *type.getSubtype(0);
     if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
-      child = buildReader(childType, stripe);
+      child = buildReader(childType, stripe, useTightNumericVector);
     }
   }
 
@@ -1033,7 +1025,7 @@ namespace orc {
     std::unique_ptr<RleDecoder> rle;
 
    public:
-    MapColumnReader(const Type& type, StripeStreams& stipe);
+    MapColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false);
     ~MapColumnReader() override;
 
     uint64_t skip(uint64_t numValues) override;
@@ -1049,7 +1041,8 @@ namespace orc {
     void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
   };
 
-  MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe)
+  MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe,
+                                   bool useTightNumericVector)
       : ColumnReader(type, stripe) {
     // Determine if the key and/or value columns are selected
     const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
@@ -1060,11 +1053,11 @@ namespace orc {
     rle = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
     const Type& keyType = *type.getSubtype(0);
     if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
-      keyReader = buildReader(keyType, stripe);
+      keyReader = buildReader(keyType, stripe, useTightNumericVector);
     }
     const Type& elementType = *type.getSubtype(1);
     if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
-      elementReader = buildReader(elementType, stripe);
+      elementReader = buildReader(elementType, stripe, useTightNumericVector);
     }
   }
 
@@ -1174,7 +1167,7 @@ namespace orc {
     uint64_t numChildren;
 
    public:
-    UnionColumnReader(const Type& type, StripeStreams& stipe);
+    UnionColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false);
 
     uint64_t skip(uint64_t numValues) override;
 
@@ -1189,7 +1182,8 @@ namespace orc {
     void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
   };
 
-  UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe)
+  UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe,
+                                       bool useTightNumericVector)
       : ColumnReader(type, stripe) {
     numChildren = type.getSubtypeCount();
     childrenReader.resize(numChildren);
@@ -1204,7 +1198,7 @@ namespace orc {
     for (unsigned int i = 0; i < numChildren; ++i) {
       const Type& child = *type.getSubtype(i);
       if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
-        childrenReader[i] = buildReader(child, stripe);
+        childrenReader[i] = buildReader(child, stripe, useTightNumericVector);
       }
     }
   }
@@ -1696,13 +1690,25 @@ namespace orc {
   /**
    * Create a reader for the given stripe.
    */
-  std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe) {
+  std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe,
+                                            bool useTightNumericVector) {
     switch (static_cast<int64_t>(type.getKind())) {
-      case DATE:
-      case INT:
+      case SHORT: {
+        if (useTightNumericVector) {
+          return std::unique_ptr<ColumnReader>(
+              new IntegerColumnReader<ShortVectorBatch>(type, stripe));
+        }
+      }
+      case INT: {
+        if (useTightNumericVector) {
+          return std::unique_ptr<ColumnReader>(
+              new IntegerColumnReader<IntVectorBatch>(type, stripe));
+        }
+      }
       case LONG:
-      case SHORT:
-        return std::unique_ptr<ColumnReader>(new IntegerColumnReader(type, stripe));
+      case DATE:
+        return std::unique_ptr<ColumnReader>(
+            new IntegerColumnReader<LongVectorBatch>(type, stripe));
       case BINARY:
       case CHAR:
       case STRING:
@@ -1722,32 +1728,51 @@ namespace orc {
         return std::unique_ptr<ColumnReader>(new BooleanColumnReader(type, stripe));
 
       case BYTE:
-        return std::unique_ptr<ColumnReader>(new ByteColumnReader(type, stripe));
+        if (useTightNumericVector) {
+          return std::unique_ptr<ColumnReader>(new ByteColumnReader<ByteVectorBatch>(type, stripe));
+        }
+        return std::unique_ptr<ColumnReader>(new ByteColumnReader<LongVectorBatch>(type, stripe));
 
       case LIST:
-        return std::unique_ptr<ColumnReader>(new ListColumnReader(type, stripe));
+        return std::unique_ptr<ColumnReader>(
+            new ListColumnReader(type, stripe, useTightNumericVector));
 
       case MAP:
-        return std::unique_ptr<ColumnReader>(new MapColumnReader(type, stripe));
+        return std::unique_ptr<ColumnReader>(
+            new MapColumnReader(type, stripe, useTightNumericVector));
 
       case UNION:
-        return std::unique_ptr<ColumnReader>(new UnionColumnReader(type, stripe));
+        return std::unique_ptr<ColumnReader>(
+            new UnionColumnReader(type, stripe, useTightNumericVector));
 
       case STRUCT:
-        return std::unique_ptr<ColumnReader>(new StructColumnReader(type, stripe));
-
-      case FLOAT:
+        return std::unique_ptr<ColumnReader>(
+            new StructColumnReader(type, stripe, useTightNumericVector));
+
+      case FLOAT: {
+        if (useTightNumericVector) {
+          if (isLittleEndian()) {
+            return std::unique_ptr<ColumnReader>(
+                new DoubleColumnReader<FLOAT, true, float, FloatVectorBatch>(type, stripe));
+          }
+          return std::unique_ptr<ColumnReader>(
+              new DoubleColumnReader<FLOAT, false, float, FloatVectorBatch>(type, stripe));
+        }
         if (isLittleEndian()) {
-          return std::unique_ptr<ColumnReader>(new DoubleColumnReader<FLOAT, true>(type, stripe));
+          return std::unique_ptr<ColumnReader>(
+              new DoubleColumnReader<FLOAT, true, double, DoubleVectorBatch>(type, stripe));
         }
-        return std::unique_ptr<ColumnReader>(new DoubleColumnReader<FLOAT, false>(type, stripe));
-
-      case DOUBLE:
+        return std::unique_ptr<ColumnReader>(
+            new DoubleColumnReader<FLOAT, false, double, DoubleVectorBatch>(type, stripe));
+      }
+      case DOUBLE: {
         if (isLittleEndian()) {
-          return std::unique_ptr<ColumnReader>(new DoubleColumnReader<DOUBLE, true>(type, stripe));
+          return std::unique_ptr<ColumnReader>(
+              new DoubleColumnReader<DOUBLE, true, double, DoubleVectorBatch>(type, stripe));
         }
-        return std::unique_ptr<ColumnReader>(new DoubleColumnReader<DOUBLE, false>(type, stripe));
-
+        return std::unique_ptr<ColumnReader>(
+            new DoubleColumnReader<DOUBLE, false, double, DoubleVectorBatch>(type, stripe));
+      }
       case TIMESTAMP:
         return std::unique_ptr<ColumnReader>(new TimestampColumnReader(type, stripe, false));
 
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 10b0bb6dc..3b765cbe5 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -158,7 +158,8 @@ namespace orc {
   /**
    * Create a reader for the given stripe.
    */
-  std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe);
+  std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe,
+                                            bool useTightNumericVector = false);
 }  // namespace orc
 
 #endif
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index a70d0bf79..b622bad2d 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -415,6 +415,7 @@ namespace orc {
     }
   }
 
+  template <typename BatchType>
   class IntegerColumnWriter : public ColumnWriter {
    public:
     IntegerColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -438,8 +439,10 @@ namespace orc {
     RleVersion rleVersion;
   };
 
-  IntegerColumnWriter::IntegerColumnWriter(const Type& type, const StreamsFactory& factory,
-                                           const WriterOptions& options)
+  template <typename BatchType>
+  IntegerColumnWriter<BatchType>::IntegerColumnWriter(const Type& type,
+                                                      const StreamsFactory& factory,
+                                                      const WriterOptions& options)
       : ColumnWriter(type, factory, options), rleVersion(options.getRleVersion()) {
     std::unique_ptr<BufferedOutputStream> dataStream =
         factory.createStream(proto::Stream_Kind_DATA);
@@ -451,11 +454,12 @@ namespace orc {
     }
   }
 
-  void IntegerColumnWriter::add(ColumnVectorBatch& rowBatch, uint64_t offset, uint64_t numValues,
-                                const char* incomingMask) {
-    const LongVectorBatch* longBatch = dynamic_cast<const LongVectorBatch*>(&rowBatch);
-    if (longBatch == nullptr) {
-      throw InvalidArgument("Failed to cast to LongVectorBatch");
+  template <typename BatchType>
+  void IntegerColumnWriter<BatchType>::add(ColumnVectorBatch& rowBatch, uint64_t offset,
+                                           uint64_t numValues, const char* incomingMask) {
+    const BatchType* intBatch = dynamic_cast<const BatchType*>(&rowBatch);
+    if (intBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to IntegerVectorBatch");
     }
     IntegerColumnStatisticsImpl* intStats =
         dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
@@ -465,8 +469,8 @@ namespace orc {
 
     ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
 
-    const int64_t* data = longBatch->data.data() + offset;
-    const char* notNull = longBatch->hasNulls ? longBatch->notNull.data() + offset : nullptr;
+    const auto* data = intBatch->data.data() + offset;
+    const char* notNull = intBatch->hasNulls ? intBatch->notNull.data() + offset : nullptr;
 
     rleEncoder->add(data, numValues, notNull);
 
@@ -476,9 +480,9 @@ namespace orc {
       if (notNull == nullptr || notNull[i]) {
         ++count;
         if (enableBloomFilter) {
-          bloomFilter->addLong(data[i]);
+          bloomFilter->addLong(static_cast<int64_t>(data[i]));
         }
-        intStats->update(data[i], 1);
+        intStats->update(static_cast<int64_t>(data[i]), 1);
       }
     }
     intStats->increase(count);
@@ -487,7 +491,8 @@ namespace orc {
     }
   }
 
-  void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) {
+  template <typename BatchType>
+  void IntegerColumnWriter<BatchType>::flush(std::vector<proto::Stream>& streams) {
     ColumnWriter::flush(streams);
 
     proto::Stream stream;
@@ -497,13 +502,16 @@ namespace orc {
     streams.push_back(stream);
   }
 
-  uint64_t IntegerColumnWriter::getEstimatedSize() const {
+  template <typename BatchType>
+  uint64_t IntegerColumnWriter<BatchType>::getEstimatedSize() const {
     uint64_t size = ColumnWriter::getEstimatedSize();
     size += rleEncoder->getBufferSize();
     return size;
   }
 
-  void IntegerColumnWriter::getColumnEncoding(std::vector<proto::ColumnEncoding>& encodings) const {
+  template <typename BatchType>
+  void IntegerColumnWriter<BatchType>::getColumnEncoding(
+      std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
     encoding.set_kind(RleVersionMapper(rleVersion));
     encoding.set_dictionarysize(0);
@@ -513,11 +521,13 @@ namespace orc {
     encodings.push_back(encoding);
   }
 
-  void IntegerColumnWriter::recordPosition() const {
+  template <typename BatchType>
+  void IntegerColumnWriter<BatchType>::recordPosition() const {
     ColumnWriter::recordPosition();
     rleEncoder->recordPosition(rowIndexPosition.get());
   }
 
+  template <typename BatchType>
   class ByteColumnWriter : public ColumnWriter {
    public:
     ByteColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
@@ -537,8 +547,9 @@ namespace orc {
     std::unique_ptr<ByteRleEncoder> byteRleEncoder;
   };
 
-  ByteColumnWriter::ByteColumnWriter(const Type& type, const StreamsFactory& factory,
-                                     const WriterOptions& options)
+  template <typename BatchType>
+  ByteColumnWriter<BatchType>::ByteColumnWriter(const Type& type, const StreamsFactory& factory,
+                                                const WriterOptions& options)
       : ColumnWriter(type, factory, options) {
     std::unique_ptr<BufferedOutputStream> dataStream =
         factory.createStream(proto::Stream_Kind_DATA);
@@ -549,11 +560,12 @@ namespace orc {
     }
   }
 
-  void ByteColumnWriter::add(ColumnVectorBatch& rowBatch, uint64_t offset, uint64_t numValues,
-                             const char* incomingMask) {
-    LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
+  template <typename BatchType>
+  void ByteColumnWriter<BatchType>::add(ColumnVectorBatch& rowBatch, uint64_t offset,
+                                        uint64_t numValues, const char* incomingMask) {
+    BatchType* byteBatch = dynamic_cast<BatchType*>(&rowBatch);
     if (byteBatch == nullptr) {
-      throw InvalidArgument("Failed to cast to LongVectorBatch");
+      throw InvalidArgument("Failed to cast to IntegerVectorBatch");
     }
     IntegerColumnStatisticsImpl* intStats =
         dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
@@ -563,7 +575,7 @@ namespace orc {
 
     ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
 
-    int64_t* data = byteBatch->data.data() + offset;
+    auto* data = byteBatch->data.data() + offset;
     const char* notNull = byteBatch->hasNulls ? byteBatch->notNull.data() + offset : nullptr;
 
     char* byteData = reinterpret_cast<char*>(data);
@@ -588,7 +600,8 @@ namespace orc {
     }
   }
 
-  void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
+  template <typename BatchType>
+  void ByteColumnWriter<BatchType>::flush(std::vector<proto::Stream>& streams) {
     ColumnWriter::flush(streams);
 
     proto::Stream stream;
@@ -598,13 +611,16 @@ namespace orc {
     streams.push_back(stream);
   }
 
-  uint64_t ByteColumnWriter::getEstimatedSize() const {
+  template <typename BatchType>
+  uint64_t ByteColumnWriter<BatchType>::getEstimatedSize() const {
     uint64_t size = ColumnWriter::getEstimatedSize();
     size += byteRleEncoder->getBufferSize();
     return size;
   }
 
-  void ByteColumnWriter::getColumnEncoding(std::vector<proto::ColumnEncoding>& encodings) const {
+  template <typename BatchType>
+  void ByteColumnWriter<BatchType>::getColumnEncoding(
+      std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
     encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
     encoding.set_dictionarysize(0);
@@ -614,7 +630,8 @@ namespace orc {
     encodings.push_back(encoding);
   }
 
-  void ByteColumnWriter::recordPosition() const {
+  template <typename BatchType>
+  void ByteColumnWriter<BatchType>::recordPosition() const {
     ColumnWriter::recordPosition();
     byteRleEncoder->recordPosition(rowIndexPosition.get());
   }
@@ -721,10 +738,11 @@ namespace orc {
     rleEncoder->recordPosition(rowIndexPosition.get());
   }
 
-  class DoubleColumnWriter : public ColumnWriter {
+  template <typename ValueType, typename BatchType>
+  class FloatingColumnWriter : public ColumnWriter {
    public:
-    DoubleColumnWriter(const Type& type, const StreamsFactory& factory,
-                       const WriterOptions& options, bool isFloat);
+    FloatingColumnWriter(const Type& type, const StreamsFactory& factory,
+                         const WriterOptions& options, bool isFloat);
 
     virtual void add(ColumnVectorBatch& rowBatch, uint64_t offset, uint64_t numValues,
                      const char* incomingMask) override;
@@ -743,8 +761,11 @@ namespace orc {
     DataBuffer<char> buffer;
   };
 
-  DoubleColumnWriter::DoubleColumnWriter(const Type& type, const StreamsFactory& factory,
-                                         const WriterOptions& options, bool isFloatType)
+  template <typename ValueType, typename BatchType>
+  FloatingColumnWriter<ValueType, BatchType>::FloatingColumnWriter(const Type& type,
+                                                                   const StreamsFactory& factory,
+                                                                   const WriterOptions& options,
+                                                                   bool isFloatType)
       : ColumnWriter(type, factory, options),
         isFloat(isFloatType),
         buffer(*options.getMemoryPool()) {
@@ -766,11 +787,13 @@ namespace orc {
     }
   }
 
-  void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, uint64_t offset, uint64_t numValues,
-                               const char* incomingMask) {
-    const DoubleVectorBatch* dblBatch = dynamic_cast<const DoubleVectorBatch*>(&rowBatch);
+  template <typename ValueType, typename BatchType>
+  void FloatingColumnWriter<ValueType, BatchType>::add(ColumnVectorBatch& rowBatch, uint64_t offset,
+                                                       uint64_t numValues,
+                                                       const char* incomingMask) {
+    const BatchType* dblBatch = dynamic_cast<const BatchType*>(&rowBatch);
     if (dblBatch == nullptr) {
-      throw InvalidArgument("Failed to cast to DoubleVectorBatch");
+      throw InvalidArgument("Failed to cast to FloatingVectorBatch");
     }
     DoubleColumnStatisticsImpl* doubleStats =
         dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
@@ -780,7 +803,7 @@ namespace orc {
 
     ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
 
-    const double* doubleData = dblBatch->data.data() + offset;
+    const ValueType* doubleData = dblBatch->data.data() + offset;
     const char* notNull = dblBatch->hasNulls ? dblBatch->notNull.data() + offset : nullptr;
 
     size_t bytes = isFloat ? 4 : 8;
@@ -791,14 +814,14 @@ namespace orc {
         if (isFloat) {
           encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data);
         } else {
-          encodeFloatNum<double, int64_t>(doubleData[i], data);
+          encodeFloatNum<double, int64_t>(static_cast<double>(doubleData[i]), data);
         }
         dataStream->write(data, bytes);
         ++count;
         if (enableBloomFilter) {
-          bloomFilter->addDouble(doubleData[i]);
+          bloomFilter->addDouble(static_cast<double>(doubleData[i]));
         }
-        doubleStats->update(doubleData[i]);
+        doubleStats->update(static_cast<double>(doubleData[i]));
       }
     }
     doubleStats->increase(count);
@@ -807,7 +830,8 @@ namespace orc {
     }
   }
 
-  void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
+  template <typename ValueType, typename BatchType>
+  void FloatingColumnWriter<ValueType, BatchType>::flush(std::vector<proto::Stream>& streams) {
     ColumnWriter::flush(streams);
 
     proto::Stream stream;
@@ -817,13 +841,16 @@ namespace orc {
     streams.push_back(stream);
   }
 
-  uint64_t DoubleColumnWriter::getEstimatedSize() const {
+  template <typename ValueType, typename BatchType>
+  uint64_t FloatingColumnWriter<ValueType, BatchType>::getEstimatedSize() const {
     uint64_t size = ColumnWriter::getEstimatedSize();
     size += dataStream->getSize();
     return size;
   }
 
-  void DoubleColumnWriter::getColumnEncoding(std::vector<proto::ColumnEncoding>& encodings) const {
+  template <typename ValueType, typename BatchType>
+  void FloatingColumnWriter<ValueType, BatchType>::getColumnEncoding(
+      std::vector<proto::ColumnEncoding>& encodings) const {
     proto::ColumnEncoding encoding;
     encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
     encoding.set_dictionarysize(0);
@@ -833,7 +860,8 @@ namespace orc {
     encodings.push_back(encoding);
   }
 
-  void DoubleColumnWriter::recordPosition() const {
+  template <typename ValueType, typename BatchType>
+  void FloatingColumnWriter<ValueType, BatchType>::recordPosition() const {
     ColumnWriter::recordPosition();
     dataStream->recordPosition(rowIndexPosition.get());
   }
@@ -1740,7 +1768,7 @@ namespace orc {
     nanoRleEncoder->recordPosition(rowIndexPosition.get());
   }
 
-  class DateColumnWriter : public IntegerColumnWriter {
+  class DateColumnWriter : public IntegerColumnWriter<LongVectorBatch> {
    public:
     DateColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
 
@@ -1750,7 +1778,7 @@ namespace orc {
 
   DateColumnWriter::DateColumnWriter(const Type& type, const StreamsFactory& factory,
                                      const WriterOptions& options)
-      : IntegerColumnWriter(type, factory, options) {
+      : IntegerColumnWriter<LongVectorBatch>(type, factory, options) {
     // PASS
   }
 
@@ -2782,18 +2810,38 @@ namespace orc {
     switch (static_cast<int64_t>(type.getKind())) {
       case STRUCT:
         return std::unique_ptr<ColumnWriter>(new StructColumnWriter(type, factory, options));
+      case SHORT:
+        if (options.getUseTightNumericVector()) {
+          return std::unique_ptr<ColumnWriter>(
+              new IntegerColumnWriter<ShortVectorBatch>(type, factory, options));
+        }
       case INT:
+        if (options.getUseTightNumericVector()) {
+          return std::unique_ptr<ColumnWriter>(
+              new IntegerColumnWriter<IntVectorBatch>(type, factory, options));
+        }
       case LONG:
-      case SHORT:
-        return std::unique_ptr<ColumnWriter>(new IntegerColumnWriter(type, factory, options));
+        return std::unique_ptr<ColumnWriter>(
+            new IntegerColumnWriter<LongVectorBatch>(type, factory, options));
       case BYTE:
-        return std::unique_ptr<ColumnWriter>(new ByteColumnWriter(type, factory, options));
+        if (options.getUseTightNumericVector()) {
+          return std::unique_ptr<ColumnWriter>(
+              new ByteColumnWriter<ByteVectorBatch>(type, factory, options));
+        }
+        return std::unique_ptr<ColumnWriter>(
+            new ByteColumnWriter<LongVectorBatch>(type, factory, options));
       case BOOLEAN:
         return std::unique_ptr<ColumnWriter>(new BooleanColumnWriter(type, factory, options));
       case DOUBLE:
-        return std::unique_ptr<ColumnWriter>(new DoubleColumnWriter(type, factory, options, false));
+        return std::unique_ptr<ColumnWriter>(
+            new FloatingColumnWriter<double, DoubleVectorBatch>(type, factory, options, false));
       case FLOAT:
-        return std::unique_ptr<ColumnWriter>(new DoubleColumnWriter(type, factory, options, true));
+        if (options.getUseTightNumericVector()) {
+          return std::unique_ptr<ColumnWriter>(
+              new FloatingColumnWriter<float, FloatVectorBatch>(type, factory, options, true));
+        }
+        return std::unique_ptr<ColumnWriter>(
+            new FloatingColumnWriter<double, DoubleVectorBatch>(type, factory, options, true));
       case BINARY:
         return std::unique_ptr<ColumnWriter>(new BinaryColumnWriter(type, factory, options));
       case STRING:
diff --git a/c++/src/MemoryPool.cc b/c++/src/MemoryPool.cc
index 89a9dcc98..bd2a30892 100644
--- a/c++/src/MemoryPool.cc
+++ b/c++/src/MemoryPool.cc
@@ -162,6 +162,24 @@ namespace orc {
     currentSize = newSize;
   }
 
+  // Specializations for float
+
+  template <>
+  DataBuffer<float>::~DataBuffer() {
+    if (buf) {
+      memoryPool.free(reinterpret_cast<char*>(buf));
+    }
+  }
+
+  template <>
+  void DataBuffer<float>::resize(uint64_t newSize) {
+    reserve(newSize);
+    if (newSize > currentSize) {
+      memset(buf + currentSize, 0, (newSize - currentSize) * sizeof(float));
+    }
+    currentSize = newSize;
+  }
+
   // Specializations for int64_t
 
   template <>
@@ -180,6 +198,60 @@ namespace orc {
     currentSize = newSize;
   }
 
+  // Specializations for int32_t
+
+  template <>
+  DataBuffer<int32_t>::~DataBuffer() {
+    if (buf) {
+      memoryPool.free(reinterpret_cast<char*>(buf));
+    }
+  }
+
+  template <>
+  void DataBuffer<int32_t>::resize(uint64_t newSize) {
+    reserve(newSize);
+    if (newSize > currentSize) {
+      memset(buf + currentSize, 0, (newSize - currentSize) * sizeof(int32_t));
+    }
+    currentSize = newSize;
+  }
+
+  // Specializations for int16_t
+
+  template <>
+  DataBuffer<int16_t>::~DataBuffer() {
+    if (buf) {
+      memoryPool.free(reinterpret_cast<char*>(buf));
+    }
+  }
+
+  template <>
+  void DataBuffer<int16_t>::resize(uint64_t newSize) {
+    reserve(newSize);
+    if (newSize > currentSize) {
+      memset(buf + currentSize, 0, (newSize - currentSize) * sizeof(int16_t));
+    }
+    currentSize = newSize;
+  }
+
+  // Specializations for int8_t
+
+  template <>
+  DataBuffer<int8_t>::~DataBuffer() {
+    if (buf) {
+      memoryPool.free(reinterpret_cast<char*>(buf));
+    }
+  }
+
+  template <>
+  void DataBuffer<int8_t>::resize(uint64_t newSize) {
+    reserve(newSize);
+    if (newSize > currentSize) {
+      memset(buf + currentSize, 0, (newSize - currentSize) * sizeof(int8_t));
+    }
+    currentSize = newSize;
+  }
+
   // Specializations for uint64_t
 
   template <>
@@ -223,8 +295,12 @@ namespace orc {
   template class DataBuffer<char>;
   template class DataBuffer<char*>;
   template class DataBuffer<double>;
+  template class DataBuffer<float>;
   template class DataBuffer<Int128>;
   template class DataBuffer<int64_t>;
+  template class DataBuffer<int32_t>;
+  template class DataBuffer<int16_t>;
+  template class DataBuffer<int8_t>;
   template class DataBuffer<uint64_t>;
   template class DataBuffer<unsigned char>;
 
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 6897c2663..2d410d4fb 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -140,6 +140,7 @@ namespace orc {
     std::shared_ptr<SearchArgument> sargs;
     std::string readerTimezone;
     RowReaderOptions::IdReadIntentMap idReadIntentMap;
+    bool useTightNumericVector;
 
     RowReaderOptionsPrivate() {
       selection = ColumnSelection_NONE;
@@ -149,6 +150,7 @@ namespace orc {
       forcedScaleOnHive11Decimal = 6;
       enableLazyDecoding = false;
       readerTimezone = "GMT";
+      useTightNumericVector = false;
     }
   };
 
@@ -298,6 +300,15 @@ namespace orc {
   const RowReaderOptions::IdReadIntentMap RowReaderOptions::getIdReadIntentMap() const {
     return privateBits->idReadIntentMap;
   }
+
+  RowReaderOptions& RowReaderOptions::setUseTightNumericVector(bool useTightNumericVector) {
+    privateBits->useTightNumericVector = useTightNumericVector;
+    return *this;
+  }
+
+  bool RowReaderOptions::getUseTightNumericVector() const {
+    return privateBits->useTightNumericVector;
+  }
 }  // namespace orc
 
 #endif
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index 506f6eef9..535281d6f 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -60,14 +60,27 @@ namespace orc {
     }
   }
 
-  void RleEncoder::add(const int64_t* data, uint64_t numValues, const char* notNull) {
+  template <typename T>
+  void RleEncoder::add(const T* data, uint64_t numValues, const char* notNull) {
     for (uint64_t i = 0; i < numValues; ++i) {
       if (!notNull || notNull[i]) {
-        write(data[i]);
+        write(static_cast<int64_t>(data[i]));
       }
     }
   }
 
+  void RleEncoder::add(const int64_t* data, uint64_t numValues, const char* notNull) {
+    add<int64_t>(data, numValues, notNull);
+  }
+
+  void RleEncoder::add(const int32_t* data, uint64_t numValues, const char* notNull) {
+    add<int32_t>(data, numValues, notNull);
+  }
+
+  void RleEncoder::add(const int16_t* data, uint64_t numValues, const char* notNull) {
+    add<int16_t>(data, numValues, notNull);
+  }
+
   void RleEncoder::writeVslong(int64_t val) {
     writeVulong((val << 1) ^ (val >> 63));
   }
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index 68a954c27..51f9b6f58 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -56,8 +56,14 @@ namespace orc {
      * @param notNull If the pointer is null, all values are read. If the
      *    pointer is not null, positions that are false are skipped.
      */
+    template <typename T>
+    void add(const T* data, uint64_t numValues, const char* notNull);
+
     virtual void add(const int64_t* data, uint64_t numValues, const char* notNull);
 
+    virtual void add(const int32_t* data, uint64_t numValues, const char* notNull);
+
+    virtual void add(const int16_t* data, uint64_t numValues, const char* notNull);
     /**
      * Get size of buffer used so far.
      */
@@ -122,6 +128,10 @@ namespace orc {
      */
     virtual void next(int64_t* data, uint64_t numValues, const char* notNull) = 0;
 
+    virtual void next(int32_t* data, uint64_t numValues, const char* notNull) = 0;
+
+    virtual void next(int16_t* data, uint64_t numValues, const char* notNull) = 0;
+
    protected:
     ReaderMetrics* metrics;
   };
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index d45bc4da5..f8431566c 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -227,8 +227,8 @@ namespace orc {
     }
   }
 
-  void RleDecoderV1::next(int64_t* const data, const uint64_t numValues,
-                          const char* const notNull) {
+  template <typename T>
+  void RleDecoderV1::next(T* const data, const uint64_t numValues, const char* const notNull) {
     SCOPED_STOPWATCH(metrics, DecodingLatencyUs, DecodingCall);
     uint64_t position = 0;
     // skipNulls()
@@ -250,13 +250,13 @@ namespace orc {
         if (notNull) {
           for (uint64_t i = 0; i < count; ++i) {
             if (notNull[position + i]) {
-              data[position + i] = value + static_cast<int64_t>(consumed) * delta;
+              data[position + i] = static_cast<T>(value + static_cast<int64_t>(consumed) * delta);
               consumed += 1;
             }
           }
         } else {
           for (uint64_t i = 0; i < count; ++i) {
-            data[position + i] = value + static_cast<int64_t>(i) * delta;
+            data[position + i] = static_cast<T>(value + static_cast<int64_t>(i) * delta);
           }
           consumed = count;
         }
@@ -266,18 +266,18 @@ namespace orc {
           for (uint64_t i = 0; i < count; ++i) {
             if (notNull[position + i]) {
               data[position + i] =
-                  isSigned ? unZigZag(readLong()) : static_cast<int64_t>(readLong());
+                  isSigned ? static_cast<T>(unZigZag(readLong())) : static_cast<T>(readLong());
               ++consumed;
             }
           }
         } else {
           if (isSigned) {
             for (uint64_t i = 0; i < count; ++i) {
-              data[position + i] = unZigZag(readLong());
+              data[position + i] = static_cast<T>(unZigZag(readLong()));
             }
           } else {
             for (uint64_t i = 0; i < count; ++i) {
-              data[position + i] = static_cast<int64_t>(readLong());
+              data[position + i] = static_cast<T>(readLong());
             }
           }
           consumed = count;
@@ -296,4 +296,15 @@ namespace orc {
     }
   }
 
+  void RleDecoderV1::next(int64_t* data, uint64_t numValues, const char* notNull) {
+    next<int64_t>(data, numValues, notNull);
+  }
+
+  void RleDecoderV1::next(int32_t* data, uint64_t numValues, const char* notNull) {
+    next<int32_t>(data, numValues, notNull);
+  }
+
+  void RleDecoderV1::next(int16_t* data, uint64_t numValues, const char* notNull) {
+    next<int16_t>(data, numValues, notNull);
+  }
 }  // namespace orc
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index d631556db..fbe6b0f9c 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -63,8 +63,15 @@ namespace orc {
     /**
      * Read a number of values into the batch.
      */
+    template <typename T>
+    void next(T* data, uint64_t numValues, const char* notNull);
+
     void next(int64_t* data, uint64_t numValues, const char* notNull) override;
 
+    void next(int32_t* data, uint64_t numValues, const char* notNull) override;
+
+    void next(int16_t* data, uint64_t numValues, const char* notNull) override;
+
    private:
     inline signed char readByte();
 
diff --git a/c++/src/RLEv2.hh b/c++/src/RLEv2.hh
index 4180dba89..f48ce8391 100644
--- a/c++/src/RLEv2.hh
+++ b/c++/src/RLEv2.hh
@@ -157,8 +157,15 @@ namespace orc {
     /**
      * Read a number of values into the batch.
      */
+    template <typename T>
+    void next(T* data, uint64_t numValues, const char* notNull);
+
     void next(int64_t* data, uint64_t numValues, const char* notNull) override;
 
+    void next(int32_t* data, uint64_t numValues, const char* notNull) override;
+
+    void next(int16_t* data, uint64_t numValues, const char* notNull) override;
+
    private:
     /**
      * Decode the next gap and patch from 'unpackedPatch' and update the index on it.
@@ -200,14 +207,16 @@ namespace orc {
     void unrolledUnpack56(int64_t* data, uint64_t offset, uint64_t len);
     void unrolledUnpack64(int64_t* data, uint64_t offset, uint64_t len);
 
-    uint64_t nextShortRepeats(int64_t* data, uint64_t offset, uint64_t numValues,
-                              const char* notNull);
-    uint64_t nextDirect(int64_t* data, uint64_t offset, uint64_t numValues, const char* notNull);
-    uint64_t nextPatched(int64_t* data, uint64_t offset, uint64_t numValues, const char* notNull);
-    uint64_t nextDelta(int64_t* data, uint64_t offset, uint64_t numValues, const char* notNull);
-
-    uint64_t copyDataFromBuffer(int64_t* data, uint64_t offset, uint64_t numValues,
-                                const char* notNull);
+    template <typename T>
+    uint64_t nextShortRepeats(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
+    template <typename T>
+    uint64_t nextDirect(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
+    template <typename T>
+    uint64_t nextPatched(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
+    template <typename T>
+    uint64_t nextDelta(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
+    template <typename T>
+    uint64_t copyDataFromBuffer(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
 
     const std::unique_ptr<SeekableInputStream> inputStream;
     const bool isSigned;
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index dab3ceec6..466cfd433 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -263,6 +263,7 @@ namespace orc {
     currentRowInStripe = 0;
     rowsInCurrentStripe = 0;
     numRowGroupsInStripeRange = 0;
+    useTightNumericVector = opts.getUseTightNumericVector();
     uint64_t rowTotal = 0;
 
     firstRowOfStripe.resize(numberOfStripes);
@@ -1090,7 +1091,7 @@ namespace orc {
       StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, currentStripeFooter,
                                       currentStripeInfo.offset(), *contents->stream, writerTimezone,
                                       readerTimezone);
-      reader = buildReader(*contents->schema, stripeStreams);
+      reader = buildReader(*contents->schema, stripeStreams, useTightNumericVector);
 
       if (sargsApplier) {
         // move to the 1st selected row group when PPD is enabled.
@@ -1204,7 +1205,8 @@ namespace orc {
   }
 
   std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch(uint64_t capacity) const {
-    return getSelectedType().createRowBatch(capacity, *contents->pool, enableEncodedBlock);
+    return getSelectedType().createRowBatch(capacity, *contents->pool, enableEncodedBlock,
+                                            useTightNumericVector);
   }
 
   void ensureOrcFooter(InputStream* stream, DataBuffer<char>* buffer, uint64_t postscriptLength) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index c7e616bea..ef04edd6c 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -157,6 +157,7 @@ namespace orc {
     std::unique_ptr<ColumnReader> reader;
 
     bool enableEncodedBlock;
+    bool useTightNumericVector;
     // internal methods
     void startNextStripe();
     inline void markEndOfFile();
diff --git a/c++/src/RleDecoderV2.cc b/c++/src/RleDecoderV2.cc
index ca5528513..2742aef6f 100644
--- a/c++/src/RleDecoderV2.cc
+++ b/c++/src/RleDecoderV2.cc
@@ -438,8 +438,8 @@ namespace orc {
     }
   }
 
-  void RleDecoderV2::next(int64_t* const data, const uint64_t numValues,
-                          const char* const notNull) {
+  template <typename T>
+  void RleDecoderV2::next(T* const data, const uint64_t numValues, const char* const notNull) {
     SCOPED_STOPWATCH(metrics, DecodingLatencyUs, DecodingCall);
     uint64_t nRead = 0;
 
@@ -478,7 +478,20 @@ namespace orc {
     }
   }
 
-  uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data, uint64_t offset, uint64_t numValues,
+  void RleDecoderV2::next(int64_t* data, uint64_t numValues, const char* notNull) {
+    next<int64_t>(data, numValues, notNull);
+  }
+
+  void RleDecoderV2::next(int32_t* data, uint64_t numValues, const char* notNull) {
+    next<int32_t>(data, numValues, notNull);
+  }
+
+  void RleDecoderV2::next(int16_t* data, uint64_t numValues, const char* notNull) {
+    next<int16_t>(data, numValues, notNull);
+  }
+
+  template <typename T>
+  uint64_t RleDecoderV2::nextShortRepeats(T* const data, uint64_t offset, uint64_t numValues,
                                           const char* const notNull) {
     if (runRead == runLength) {
       // extract the number of fixed bytes
@@ -503,13 +516,13 @@ namespace orc {
     if (notNull) {
       for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
         if (notNull[pos]) {
-          data[pos] = literals[0];
+          data[pos] = static_cast<T>(literals[0]);
           ++runRead;
         }
       }
     } else {
       for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
-        data[pos] = literals[0];
+        data[pos] = static_cast<T>(literals[0]);
         ++runRead;
       }
     }
@@ -517,7 +530,8 @@ namespace orc {
     return nRead;
   }
 
-  uint64_t RleDecoderV2::nextDirect(int64_t* const data, uint64_t offset, uint64_t numValues,
+  template <typename T>
+  uint64_t RleDecoderV2::nextDirect(T* const data, uint64_t offset, uint64_t numValues,
                                     const char* const notNull) {
     if (runRead == runLength) {
       // extract the number of fixed bits
@@ -565,7 +579,8 @@ namespace orc {
     *patchIdx = idx;
   }
 
-  uint64_t RleDecoderV2::nextPatched(int64_t* const data, uint64_t offset, uint64_t numValues,
+  template <typename T>
+  uint64_t RleDecoderV2::nextPatched(T* const data, uint64_t offset, uint64_t numValues,
                                      const char* const notNull) {
     if (runRead == runLength) {
       // extract the number of fixed bits
@@ -663,7 +678,8 @@ namespace orc {
     return copyDataFromBuffer(data, offset, numValues, notNull);
   }
 
-  uint64_t RleDecoderV2::nextDelta(int64_t* const data, uint64_t offset, uint64_t numValues,
+  template <typename T>
+  uint64_t RleDecoderV2::nextDelta(T* const data, uint64_t offset, uint64_t numValues,
                                    const char* const notNull) {
     if (runRead == runLength) {
       // extract the number of fixed bits
@@ -727,18 +743,20 @@ namespace orc {
     return copyDataFromBuffer(data, offset, numValues, notNull);
   }
 
-  uint64_t RleDecoderV2::copyDataFromBuffer(int64_t* data, uint64_t offset, uint64_t numValues,
+  template <typename T>
+  uint64_t RleDecoderV2::copyDataFromBuffer(T* data, uint64_t offset, uint64_t numValues,
                                             const char* notNull) {
     uint64_t nRead = std::min(runLength - runRead, numValues);
     if (notNull) {
       for (uint64_t i = offset; i < (offset + nRead); ++i) {
         if (notNull[i]) {
-          data[i] = literals[runRead++];
+          data[i] = static_cast<T>(literals[runRead++]);
         }
       }
     } else {
-      memcpy(data + offset, literals.data() + runRead, nRead * sizeof(int64_t));
-      runRead += nRead;
+      for (uint64_t i = offset; i < (offset + nRead); ++i) {
+        data[i] = static_cast<T>(literals[runRead++]);
+      }
     }
     return nRead;
   }
diff --git a/c++/src/Timezone.cc b/c++/src/Timezone.cc
index c330978d4..0b007d1f8 100644
--- a/c++/src/Timezone.cc
+++ b/c++/src/Timezone.cc
@@ -24,6 +24,7 @@
 #include <stdlib.h>
 #include <string.h>
 #include <time.h>
+#include <iostream>
 #include <map>
 #include <sstream>
 
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index e586eef5c..dd7a0f0be 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -279,16 +279,37 @@ namespace orc {
   std::unique_ptr<ColumnVectorBatch> TypeImpl::createRowBatch(uint64_t capacity,
                                                               MemoryPool& memoryPool,
                                                               bool encoded) const {
+    return createRowBatch(capacity, memoryPool, encoded, false);
+  }
+
+  std::unique_ptr<ColumnVectorBatch> TypeImpl::createRowBatch(uint64_t capacity,
+                                                              MemoryPool& memoryPool, bool encoded,
+                                                              bool useTightNumericVector) const {
     switch (static_cast<int64_t>(kind)) {
       case BOOLEAN:
-      case BYTE:
-      case SHORT:
-      case INT:
+      case BYTE: {
+        if (useTightNumericVector) {
+          return std::unique_ptr<ColumnVectorBatch>(new ByteVectorBatch(capacity, memoryPool));
+        }
+      }
+      case SHORT: {
+        if (useTightNumericVector) {
+          return std::unique_ptr<ColumnVectorBatch>(new ShortVectorBatch(capacity, memoryPool));
+        }
+      }
+      case INT: {
+        if (useTightNumericVector) {
+          return std::unique_ptr<ColumnVectorBatch>(new IntVectorBatch(capacity, memoryPool));
+        }
+      }
       case LONG:
       case DATE:
         return std::unique_ptr<ColumnVectorBatch>(new LongVectorBatch(capacity, memoryPool));
 
       case FLOAT:
+        if (useTightNumericVector) {
+          return std::unique_ptr<ColumnVectorBatch>(new FloatVectorBatch(capacity, memoryPool));
+        }
       case DOUBLE:
         return std::unique_ptr<ColumnVectorBatch>(new DoubleVectorBatch(capacity, memoryPool));
 
@@ -311,7 +332,9 @@ namespace orc {
             std::unique_ptr<ColumnVectorBatch>(result);
         for (uint64_t i = 0; i < getSubtypeCount(); ++i) {
           result->fields.push_back(
-              getSubtype(i)->createRowBatch(capacity, memoryPool, encoded).release());
+              getSubtype(i)
+                  ->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector)
+                  .release());
         }
         return return_value;
       }
@@ -321,7 +344,8 @@ namespace orc {
         std::unique_ptr<ColumnVectorBatch> return_value =
             std::unique_ptr<ColumnVectorBatch>(result);
         if (getSubtype(0) != nullptr) {
-          result->elements = getSubtype(0)->createRowBatch(capacity, memoryPool, encoded);
+          result->elements =
+              getSubtype(0)->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector);
         }
         return return_value;
       }
@@ -331,10 +355,12 @@ namespace orc {
         std::unique_ptr<ColumnVectorBatch> return_value =
             std::unique_ptr<ColumnVectorBatch>(result);
         if (getSubtype(0) != nullptr) {
-          result->keys = getSubtype(0)->createRowBatch(capacity, memoryPool, encoded);
+          result->keys =
+              getSubtype(0)->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector);
         }
         if (getSubtype(1) != nullptr) {
-          result->elements = getSubtype(1)->createRowBatch(capacity, memoryPool, encoded);
+          result->elements =
+              getSubtype(1)->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector);
         }
         return return_value;
       }
@@ -354,7 +380,9 @@ namespace orc {
             std::unique_ptr<ColumnVectorBatch>(result);
         for (uint64_t i = 0; i < getSubtypeCount(); ++i) {
           result->children.push_back(
-              getSubtype(i)->createRowBatch(capacity, memoryPool, encoded).release());
+              getSubtype(i)
+                  ->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector)
+                  .release());
         }
         return return_value;
       }
diff --git a/c++/src/TypeImpl.hh b/c++/src/TypeImpl.hh
index 21b69b433..7165e102b 100644
--- a/c++/src/TypeImpl.hh
+++ b/c++/src/TypeImpl.hh
@@ -94,6 +94,10 @@ namespace orc {
     std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size, MemoryPool& memoryPool,
                                                       bool encoded = false) const override;
 
+    std::unique_ptr<ColumnVectorBatch> createRowBatch(
+        uint64_t size, MemoryPool& memoryPool, bool encoded = false,
+        bool useTightNumericVector = false) const override;
+
     /**
      * Explicitly set the column ids. Only for internal usage.
      */
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index 4ececb070..ef16b3180 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -60,68 +60,6 @@ namespace orc {
     return false;
   }
 
-  LongVectorBatch::LongVectorBatch(uint64_t _capacity, MemoryPool& pool)
-      : ColumnVectorBatch(_capacity, pool), data(pool, _capacity) {
-    // PASS
-  }
-
-  LongVectorBatch::~LongVectorBatch() {
-    // PASS
-  }
-
-  std::string LongVectorBatch::toString() const {
-    std::ostringstream buffer;
-    buffer << "Long vector <" << numElements << " of " << capacity << ">";
-    return buffer.str();
-  }
-
-  void LongVectorBatch::resize(uint64_t cap) {
-    if (capacity < cap) {
-      ColumnVectorBatch::resize(cap);
-      data.resize(cap);
-    }
-  }
-
-  void LongVectorBatch::clear() {
-    numElements = 0;
-  }
-
-  uint64_t LongVectorBatch::getMemoryUsage() {
-    return ColumnVectorBatch::getMemoryUsage() +
-           static_cast<uint64_t>(data.capacity() * sizeof(int64_t));
-  }
-
-  DoubleVectorBatch::DoubleVectorBatch(uint64_t _capacity, MemoryPool& pool)
-      : ColumnVectorBatch(_capacity, pool), data(pool, _capacity) {
-    // PASS
-  }
-
-  DoubleVectorBatch::~DoubleVectorBatch() {
-    // PASS
-  }
-
-  std::string DoubleVectorBatch::toString() const {
-    std::ostringstream buffer;
-    buffer << "Double vector <" << numElements << " of " << capacity << ">";
-    return buffer.str();
-  }
-
-  void DoubleVectorBatch::resize(uint64_t cap) {
-    if (capacity < cap) {
-      ColumnVectorBatch::resize(cap);
-      data.resize(cap);
-    }
-  }
-
-  void DoubleVectorBatch::clear() {
-    numElements = 0;
-  }
-
-  uint64_t DoubleVectorBatch::getMemoryUsage() {
-    return ColumnVectorBatch::getMemoryUsage() +
-           static_cast<uint64_t>(data.capacity() * sizeof(double));
-  }
-
   StringDictionary::StringDictionary(MemoryPool& pool)
       : dictionaryBlob(pool), dictionaryOffset(pool) {
     // PASS
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index c975ecde7..36c9efb36 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -44,6 +44,7 @@ namespace orc {
     BloomFilterVersion bloomFilterVersion;
     std::string timezone;
     WriterMetrics* metrics;
+    bool useTightNumericVector;
 
     WriterOptionsPrivate() : fileVersion(FileVersion::v_0_12()) {  // default to Hive_0_12
       stripeSize = 64 * 1024 * 1024;                               // 64M
@@ -63,6 +64,7 @@ namespace orc {
       // Explictly set the writer timezone if the use case depends on it.
       timezone = "GMT";
       metrics = nullptr;
+      useTightNumericVector = false;
     }
   };
 
@@ -262,6 +264,15 @@ namespace orc {
     return *this;
   }
 
+  WriterOptions& WriterOptions::setUseTightNumericVector(bool useTightNumericVector) {
+    privateBits->useTightNumericVector = useTightNumericVector;
+    return *this;
+  }
+
+  bool WriterOptions::getUseTightNumericVector() const {
+    return privateBits->useTightNumericVector;
+  }
+
   Writer::~Writer() {
     // PASS
   }
@@ -284,6 +295,7 @@ namespace orc {
 
     static const char* magicId;
     static const WriterId writerId;
+    bool useTightNumericVector;
 
    public:
     WriterImpl(const Type& type, OutputStream* stream, const WriterOptions& options);
@@ -318,6 +330,8 @@ namespace orc {
     stripeRows = totalRows = indexRows = 0;
     currentOffset = 0;
 
+    useTightNumericVector = opts.getUseTightNumericVector();
+
     // compression stream for stripe footer, file footer and metadata
     compressionStream = createCompressor(
         options.getCompression(), outStream, options.getCompressionStrategy(),
@@ -334,7 +348,7 @@ namespace orc {
   }
 
   std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size) const {
-    return type.createRowBatch(size, *options.getMemoryPool());
+    return type.createRowBatch(size, *options.getMemoryPool(), false, useTightNumericVector);
   }
 
   void WriterImpl::add(ColumnVectorBatch& rowsToAdd) {
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index e71c60afb..be35a95eb 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -45,7 +45,8 @@ namespace orc {
                                        CompressionKind compression, const Type& type,
                                        MemoryPool* memoryPool, OutputStream* stream,
                                        FileVersion version, uint64_t stride = 0,
-                                       const std::string& timezone = "GMT") {
+                                       const std::string& timezone = "GMT",
+                                       bool useTightNumericVector = false) {
     WriterOptions options;
     options.setStripeSize(stripeSize);
     options.setCompressionBlockSize(compresionblockSize);
@@ -54,6 +55,7 @@ namespace orc {
     options.setRowIndexStride(stride);
     options.setFileVersion(version);
     options.setTimezoneName(timezone);
+    options.setUseTightNumericVector(useTightNumericVector);
     return createWriter(type, stream, options);
   }
 
@@ -64,9 +66,11 @@ namespace orc {
     return createReader(std::move(stream), options);
   }
 
-  std::unique_ptr<RowReader> createRowReader(Reader* reader, const std::string& timezone = "GMT") {
+  std::unique_ptr<RowReader> createRowReader(Reader* reader, const std::string& timezone = "GMT",
+                                             bool useTightNumericVector = false) {
     RowReaderOptions rowReaderOpts;
     rowReaderOpts.setTimezoneName(timezone);
+    rowReaderOpts.setUseTightNumericVector(useTightNumericVector);
     return reader->createRowReader(rowReaderOpts);
   }
 
@@ -1855,6 +1859,99 @@ namespace orc {
     testSuppressPresentStream(CompressionKind_SNAPPY);
   }
 
+  TEST_P(WriterTest, testWriteFixedWidthNumericVectorBatch) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString(
+        "struct<col1:double,col2:float,col3:int,col4:smallint,col5:tinyint,col6:bigint>"));
+
+    uint64_t stripeSize = 16 * 1024;
+    uint64_t compressionBlockSize = 1024;
+    uint64_t rowCount = 65530;
+
+    std::vector<double> data(rowCount);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      data[i] = 100000 * (std::rand() * 1.0 / RAND_MAX);
+    }
+
+    std::unique_ptr<Writer> writer =
+        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool,
+                     &memStream, fileVersion, 0, "GMT", true);
+    // start from here/
+    std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount / 2);
+    StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+    DoubleVectorBatch* doubleBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]);
+    FloatVectorBatch* floatBatch = dynamic_cast<FloatVectorBatch*>(structBatch->fields[1]);
+    IntVectorBatch* intBatch = dynamic_cast<IntVectorBatch*>(structBatch->fields[2]);
+    ShortVectorBatch* shortBatch = dynamic_cast<ShortVectorBatch*>(structBatch->fields[3]);
+    ByteVectorBatch* byteBatch = dynamic_cast<ByteVectorBatch*>(structBatch->fields[4]);
+    LongVectorBatch* longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[5]);
+    structBatch->resize(rowCount);
+    doubleBatch->resize(rowCount);
+    floatBatch->resize(rowCount);
+    intBatch->resize(rowCount);
+    shortBatch->resize(rowCount);
+    byteBatch->resize(rowCount);
+    longBatch->resize(rowCount);
+
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      structBatch->notNull[i] = 1;
+      doubleBatch->notNull[i] = 1;
+      floatBatch->notNull[i] = 1;
+      intBatch->notNull[i] = 1;
+      shortBatch->notNull[i] = 1;
+      byteBatch->notNull[i] = 1;
+      longBatch->notNull[i] = 1;
+
+      doubleBatch->data[i] = data[i];
+      floatBatch->data[i] = static_cast<float>(data[i]);
+      intBatch->data[i] = static_cast<int32_t>(i);
+      shortBatch->data[i] = static_cast<int16_t>(i);
+      byteBatch->data[i] = static_cast<int8_t>(i);
+      longBatch->data[i] = static_cast<int64_t>(i);
+    }
+
+    structBatch->numElements = rowCount;
+    doubleBatch->numElements = rowCount;
+    floatBatch->numElements = rowCount;
+    intBatch->numElements = rowCount;
+    shortBatch->numElements = rowCount;
+    byteBatch->numElements = rowCount;
+    longBatch->numElements = rowCount;
+
+    writer->add(*batch);
+    writer->close();
+
+    std::unique_ptr<InputStream> inStream(
+        new MemoryInputStream(memStream.getData(), memStream.getLength()));
+    std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get(), "GMT", true);
+
+    EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(rowCount);
+    EXPECT_EQ(true, rowReader->next(*batch));
+    EXPECT_EQ(rowCount, batch->numElements);
+
+    structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+    doubleBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]);
+    floatBatch = dynamic_cast<FloatVectorBatch*>(structBatch->fields[1]);
+    intBatch = dynamic_cast<IntVectorBatch*>(structBatch->fields[2]);
+    shortBatch = dynamic_cast<ShortVectorBatch*>(structBatch->fields[3]);
+    byteBatch = dynamic_cast<ByteVectorBatch*>(structBatch->fields[4]);
+    longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[5]);
+    for (uint64_t i = 0; i < rowCount; ++i) {
+      EXPECT_TRUE(std::abs(data[i] - doubleBatch->data[i]) < 0.000001);
+      EXPECT_TRUE(std::abs(static_cast<float>(data[i]) - static_cast<float>(floatBatch->data[i])) <
+                  0.000001f);
+      EXPECT_EQ(intBatch->data[i], static_cast<int32_t>(i));
+      EXPECT_EQ(shortBatch->data[i], static_cast<int16_t>(i));
+      EXPECT_EQ(byteBatch->data[i], static_cast<int8_t>(i));
+      EXPECT_EQ(longBatch->data[i], static_cast<int64_t>(i));
+    }
+    EXPECT_FALSE(rowReader->next(*batch));
+  }
+
   INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest,
                            Values(FileVersion::v_0_11(), FileVersion::v_0_12(),
                                   FileVersion::UNSTABLE_PRE_2_0()));