You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/11/11 09:13:40 UTC
[orc] branch main updated: ORC-1298: [C++] Support dedicated ColumnVectorBatch of numeric types
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 597477e2c ORC-1298: [C++] Support dedicated ColumnVectorBatch of numeric types
597477e2c is described below
commit 597477e2cd594d2283173924ad502c6640c3b143
Author: zou Hao <34...@users.noreply.github.com>
AuthorDate: Fri Nov 11 17:13:34 2022 +0800
ORC-1298: [C++] Support dedicated ColumnVectorBatch of numeric types
This closes #1307
---
c++/include/orc/MemoryPool.hh | 36 ++++++
c++/include/orc/Reader.hh | 12 ++
c++/include/orc/Type.hh | 4 +
c++/include/orc/Vector.hh | 122 +++++++++++++++---
c++/include/orc/Writer.hh | 11 ++
c++/src/ColumnReader.cc | 285 +++++++++++++++++++++++-------------------
c++/src/ColumnReader.hh | 3 +-
c++/src/ColumnWriter.cc | 148 ++++++++++++++--------
c++/src/MemoryPool.cc | 76 +++++++++++
c++/src/Options.hh | 11 ++
c++/src/RLE.cc | 17 ++-
c++/src/RLE.hh | 10 ++
c++/src/RLEv1.cc | 25 ++--
c++/src/RLEv1.hh | 7 ++
c++/src/RLEv2.hh | 25 ++--
c++/src/Reader.cc | 6 +-
c++/src/Reader.hh | 1 +
c++/src/RleDecoderV2.cc | 42 +++++--
c++/src/Timezone.cc | 1 +
c++/src/TypeImpl.cc | 44 +++++--
c++/src/TypeImpl.hh | 4 +
c++/src/Vector.cc | 62 ---------
c++/src/Writer.cc | 16 ++-
c++/test/TestWriter.cc | 101 ++++++++++++++-
24 files changed, 767 insertions(+), 302 deletions(-)
diff --git a/c++/include/orc/MemoryPool.hh b/c++/include/orc/MemoryPool.hh
index 347505412..f0dbeb16d 100644
--- a/c++/include/orc/MemoryPool.hh
+++ b/c++/include/orc/MemoryPool.hh
@@ -108,6 +108,14 @@ namespace orc {
template <>
void DataBuffer<double>::resize(uint64_t newSize);
+ // Specializations for float
+
+ template <>
+ DataBuffer<float>::~DataBuffer();
+
+ template <>
+ void DataBuffer<float>::resize(uint64_t newSize);
+
// Specializations for int64_t
template <>
@@ -116,6 +124,30 @@ namespace orc {
template <>
void DataBuffer<int64_t>::resize(uint64_t newSize);
+ // Specializations for int32_t
+
+ template <>
+ DataBuffer<int32_t>::~DataBuffer();
+
+ template <>
+ void DataBuffer<int32_t>::resize(uint64_t newSize);
+
+ // Specializations for int16_t
+
+ template <>
+ DataBuffer<int16_t>::~DataBuffer();
+
+ template <>
+ void DataBuffer<int16_t>::resize(uint64_t newSize);
+
+ // Specializations for int8_t
+
+ template <>
+ DataBuffer<int8_t>::~DataBuffer();
+
+ template <>
+ void DataBuffer<int8_t>::resize(uint64_t newSize);
+
// Specializations for uint64_t
template <>
@@ -140,8 +172,12 @@ namespace orc {
extern template class DataBuffer<char>;
extern template class DataBuffer<char*>;
extern template class DataBuffer<double>;
+ extern template class DataBuffer<float>;
extern template class DataBuffer<Int128>;
extern template class DataBuffer<int64_t>;
+ extern template class DataBuffer<int32_t>;
+ extern template class DataBuffer<int16_t>;
+ extern template class DataBuffer<int8_t>;
extern template class DataBuffer<uint64_t>;
extern template class DataBuffer<unsigned char>;
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 98e4e8b3a..a1ac53af8 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -324,6 +324,18 @@ namespace orc {
* Get the IdReadIntentMap map that was supplied by client.
*/
const IdReadIntentMap getIdReadIntentMap() const;
+
+ /**
+ * Set whether use fixed width numeric vectorBatch or not, such as int32_t / int16_t / int8_t /
+ * float vectorBatch.
+ */
+ RowReaderOptions& setUseTightNumericVector(bool useTightNumericVector);
+
+ /**
+ * Get whether or not to use fixed width numeric columnVectorBatch.
+ * @return if not set, the default is false
+ */
+ bool getUseTightNumericVector() const;
};
class RowReader;
diff --git a/c++/include/orc/Type.hh b/c++/include/orc/Type.hh
index e3c9768c1..e21852b61 100644
--- a/c++/include/orc/Type.hh
+++ b/c++/include/orc/Type.hh
@@ -72,6 +72,10 @@ namespace orc {
virtual ORC_UNIQUE_PTR<ColumnVectorBatch> createRowBatch(uint64_t size, MemoryPool& pool,
bool encoded = false) const = 0;
+ virtual ORC_UNIQUE_PTR<ColumnVectorBatch> createRowBatch(
+ uint64_t size, MemoryPool& pool, bool encoded = false,
+ bool useTightNumericVector = false) const = 0;
+
/**
* Add a new field to a struct type.
* @param fieldName the name of the new field
diff --git a/c++/include/orc/Vector.hh b/c++/include/orc/Vector.hh
index 27afaa71f..4ed9a7e78 100644
--- a/c++/include/orc/Vector.hh
+++ b/c++/include/orc/Vector.hh
@@ -25,9 +25,9 @@
#include <cstdlib>
#include <cstring>
-#include <iostream>
#include <list>
#include <memory>
+#include <sstream>
#include <stdexcept>
#include <vector>
@@ -88,28 +88,116 @@ namespace orc {
ColumnVectorBatch& operator=(const ColumnVectorBatch&);
};
- struct LongVectorBatch : public ColumnVectorBatch {
- LongVectorBatch(uint64_t capacity, MemoryPool& pool);
- virtual ~LongVectorBatch();
+ template <typename ValueType>
+ struct IntegerVectorBatch : public ColumnVectorBatch {
+ IntegerVectorBatch(uint64_t cap, MemoryPool& pool)
+ : ColumnVectorBatch(cap, pool), data(pool, cap) {
+ // PASS
+ }
- DataBuffer<int64_t> data;
- std::string toString() const;
- void resize(uint64_t capacity);
- void clear();
- uint64_t getMemoryUsage();
+ virtual ~IntegerVectorBatch() = default;
+
+ inline std::string toString() const;
+
+ void resize(uint64_t cap) {
+ if (capacity < cap) {
+ ColumnVectorBatch::resize(cap);
+ data.resize(cap);
+ }
+ }
+
+ void clear() {
+ numElements = 0;
+ }
+
+ uint64_t getMemoryUsage() {
+ return ColumnVectorBatch::getMemoryUsage() +
+ static_cast<uint64_t>(data.capacity() * sizeof(ValueType));
+ }
+
+ DataBuffer<ValueType> data;
};
- struct DoubleVectorBatch : public ColumnVectorBatch {
- DoubleVectorBatch(uint64_t capacity, MemoryPool& pool);
- virtual ~DoubleVectorBatch();
- std::string toString() const;
- void resize(uint64_t capacity);
- void clear();
- uint64_t getMemoryUsage();
+ using LongVectorBatch = IntegerVectorBatch<int64_t>;
+ using IntVectorBatch = IntegerVectorBatch<int32_t>;
+ using ShortVectorBatch = IntegerVectorBatch<int16_t>;
+ using ByteVectorBatch = IntegerVectorBatch<int8_t>;
+
+ template <>
+ inline std::string LongVectorBatch::toString() const {
+ std::ostringstream buffer;
+ buffer << "Long vector <" << numElements << " of " << capacity << ">";
+ return buffer.str();
+ }
+
+ template <>
+ inline std::string IntVectorBatch::toString() const {
+ std::ostringstream buffer;
+ buffer << "Int vector <" << numElements << " of " << capacity << ">";
+ return buffer.str();
+ }
+
+ template <>
+ inline std::string ShortVectorBatch::toString() const {
+ std::ostringstream buffer;
+ buffer << "Short vector <" << numElements << " of " << capacity << ">";
+ return buffer.str();
+ }
+
+ template <>
+ inline std::string ByteVectorBatch::toString() const {
+ std::ostringstream buffer;
+ buffer << "Byte vector <" << numElements << " of " << capacity << ">";
+ return buffer.str();
+ }
+
+ template <typename FloatType>
+ struct FloatingVectorBatch : public ColumnVectorBatch {
+ FloatingVectorBatch(uint64_t cap, MemoryPool& pool)
+ : ColumnVectorBatch(cap, pool), data(pool, cap) {
+ // PASS
+ }
+
+ virtual ~FloatingVectorBatch() = default;
- DataBuffer<double> data;
+ inline std::string toString() const;
+
+ void resize(uint64_t cap) {
+ if (capacity < cap) {
+ ColumnVectorBatch::resize(cap);
+ data.resize(cap);
+ }
+ }
+
+ void clear() {
+ numElements = 0;
+ }
+
+ uint64_t getMemoryUsage() {
+ return ColumnVectorBatch::getMemoryUsage() +
+ static_cast<uint64_t>(data.capacity() * sizeof(FloatType));
+ }
+
+ DataBuffer<FloatType> data;
};
+ using DoubleVectorBatch = FloatingVectorBatch<double>;
+ using FloatVectorBatch = FloatingVectorBatch<float>;
+
+ template <>
+ inline std::string DoubleVectorBatch::toString() const {
+ std::ostringstream buffer;
+ buffer << "Double vector <" << numElements << " of " << capacity << ">";
+ return buffer.str();
+ }
+
+ template <>
+ inline std::string FloatVectorBatch::toString() const {
+ std::ostringstream buffer;
+ buffer << "Float vector <" << numElements << " of " << capacity << ">";
+ return buffer.str();
+ }
+
struct StringVectorBatch : public ColumnVectorBatch {
StringVectorBatch(uint64_t capacity, MemoryPool& pool);
virtual ~StringVectorBatch();
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 854dc1dd7..032cad536 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -251,6 +251,17 @@ namespace orc {
* @return if not set, return nullptr.
*/
WriterMetrics* getWriterMetrics() const;
+
+ /**
+ * Set use tight numeric vectorBatch or not.
+ */
+ WriterOptions& setUseTightNumericVector(bool useTightNumericVector);
+
+ /**
+ * Get whether or not to use dedicated columnVectorBatch
+ * @return if not set, the default is false
+ */
+ bool getUseTightNumericVector() const;
};
class Writer {
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index 5106162c6..eea978b0e 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -116,14 +116,18 @@ namespace orc {
}
/**
- * Expand an array of bytes in place to the corresponding array of longs.
+ * Expand an array of bytes in place to the corresponding array of longs/.
* Has to work backwards so that they data isn't clobbered during the
* expansion.
* @param buffer the array of chars and array of longs that need to be
* expanded
* @param numValues the number of bytes to convert to longs
*/
- void expandBytesToLongs(int64_t* buffer, uint64_t numValues) {
+ template <typename T>
+ void expandBytesToIntegers(T* buffer, uint64_t numValues) {
+ if (sizeof(T) == sizeof(char)) {
+ return;
+ }
for (size_t i = numValues - 1; i < numValues; --i) {
buffer[i] = reinterpret_cast<char*>(buffer)[i];
}
@@ -169,7 +173,7 @@ namespace orc {
int64_t* ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
rle->next(reinterpret_cast<char*>(ptr), numValues,
rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
- expandBytesToLongs(ptr, numValues);
+ expandBytesToIntegers(ptr, numValues);
}
void BooleanColumnReader::seekToRowGroup(
@@ -178,99 +182,78 @@ namespace orc {
rle->seek(positions.at(columnId));
}
+ template <typename BatchType>
class ByteColumnReader : public ColumnReader {
private:
std::unique_ptr<orc::ByteRleDecoder> rle;
public:
- ByteColumnReader(const Type& type, StripeStreams& stipe);
- ~ByteColumnReader() override;
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
-
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
- };
-
- ByteColumnReader::ByteColumnReader(const Type& type, StripeStreams& stripe)
- : ColumnReader(type, stripe) {
- std::unique_ptr<SeekableInputStream> stream =
- stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
- if (stream == nullptr) throw ParseError("DATA stream not found in Byte column");
- rle = createByteRleDecoder(std::move(stream), metrics);
- }
+ ByteColumnReader(const Type& type, StripeStreams& stripe) : ColumnReader(type, stripe) {
+ std::unique_ptr<SeekableInputStream> stream =
+ stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
+ if (stream == nullptr) throw ParseError("DATA stream not found in Byte column");
+ rle = createByteRleDecoder(std::move(stream), metrics);
+ }
- ByteColumnReader::~ByteColumnReader() {
- // PASS
- }
+ ~ByteColumnReader() override = default;
- uint64_t ByteColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- rle->skip(numValues);
- return numValues;
- }
+ uint64_t skip(uint64_t numValues) override {
+ numValues = ColumnReader::skip(numValues);
+ rle->skip(numValues);
+ return numValues;
+ }
- void ByteColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- // Since the byte rle places the output in a char* instead of long*,
- // we cheat here and use the long* and then expand it in a second pass.
- int64_t* ptr = dynamic_cast<LongVectorBatch&>(rowBatch).data.data();
- rle->next(reinterpret_cast<char*>(ptr), numValues,
- rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
- expandBytesToLongs(ptr, numValues);
- }
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+ ColumnReader::next(rowBatch, numValues, notNull);
+ // Since the byte rle places the output in a char* instead of long*,
+ // we cheat here and use the long* and then expand it in a second pass.
+ auto* ptr = dynamic_cast<BatchType&>(rowBatch).data.data();
+ rle->next(reinterpret_cast<char*>(ptr), numValues,
+ rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
+ expandBytesToIntegers(ptr, numValues);
+ }
- void ByteColumnReader::seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
- rle->seek(positions.at(columnId));
- }
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ }
+ };
+ template <typename BatchType>
class IntegerColumnReader : public ColumnReader {
protected:
std::unique_ptr<orc::RleDecoder> rle;
public:
- IntegerColumnReader(const Type& type, StripeStreams& stripe);
- ~IntegerColumnReader() override;
-
- uint64_t skip(uint64_t numValues) override;
-
- void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override;
-
- void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override;
- };
-
- IntegerColumnReader::IntegerColumnReader(const Type& type, StripeStreams& stripe)
- : ColumnReader(type, stripe) {
- RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
- std::unique_ptr<SeekableInputStream> stream =
- stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
- if (stream == nullptr) throw ParseError("DATA stream not found in Integer column");
- rle = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
- }
+ IntegerColumnReader(const Type& type, StripeStreams& stripe) : ColumnReader(type, stripe) {
+ RleVersion vers = convertRleVersion(stripe.getEncoding(columnId).kind());
+ std::unique_ptr<SeekableInputStream> stream =
+ stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
+ if (stream == nullptr) throw ParseError("DATA stream not found in Integer column");
+ rle = createRleDecoder(std::move(stream), true, vers, memoryPool, metrics);
+ }
- IntegerColumnReader::~IntegerColumnReader() {
- // PASS
- }
+ ~IntegerColumnReader() override {
+ // PASS
+ }
- uint64_t IntegerColumnReader::skip(uint64_t numValues) {
- numValues = ColumnReader::skip(numValues);
- rle->skip(numValues);
- return numValues;
- }
+ uint64_t skip(uint64_t numValues) override {
+ numValues = ColumnReader::skip(numValues);
+ rle->skip(numValues);
+ return numValues;
+ }
- void IntegerColumnReader::next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
- ColumnReader::next(rowBatch, numValues, notNull);
- rle->next(dynamic_cast<LongVectorBatch&>(rowBatch).data.data(), numValues,
- rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
- }
+ void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
+ ColumnReader::next(rowBatch, numValues, notNull);
+ rle->next(dynamic_cast<BatchType&>(rowBatch).data.data(), numValues,
+ rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr);
+ }
- void IntegerColumnReader::seekToRowGroup(
- std::unordered_map<uint64_t, PositionProvider>& positions) {
- ColumnReader::seekToRowGroup(positions);
- rle->seek(positions.at(columnId));
- }
+ void seekToRowGroup(std::unordered_map<uint64_t, PositionProvider>& positions) override {
+ ColumnReader::seekToRowGroup(positions);
+ rle->seek(positions.at(columnId));
+ }
+ };
class TimestampColumnReader : public ColumnReader {
private:
@@ -368,7 +351,7 @@ namespace orc {
nanoRle->seek(positions.at(columnId));
}
- template <TypeKind columnKind, bool isLittleEndian>
+ template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
class DoubleColumnReader : public ColumnReader {
public:
DoubleColumnReader(const Type& type, StripeStreams& stripe);
@@ -397,7 +380,8 @@ namespace orc {
return static_cast<unsigned char>(*(bufferPointer++));
}
- double readDouble() {
+ template <typename FloatType>
+ FloatType readDouble() {
int64_t bits = 0;
if (bufferEnd - bufferPointer >= 8) {
if (isLittleEndian) {
@@ -418,11 +402,12 @@ namespace orc {
bits |= static_cast<int64_t>(readByte()) << (i * 8);
}
}
- double* result = reinterpret_cast<double*>(&bits);
+ FloatType* result = reinterpret_cast<FloatType*>(&bits);
return *result;
}
- double readFloat() {
+ template <typename FloatType>
+ FloatType readFloat() {
int32_t bits = 0;
if (bufferEnd - bufferPointer >= 4) {
if (isLittleEndian) {
@@ -440,20 +425,24 @@ namespace orc {
}
}
float* result = reinterpret_cast<float*>(&bits);
- return static_cast<double>(*result);
+ if (!result) {
+ std::cerr << "read float empty." << std::endl;
+ }
+ return static_cast<FloatType>(*result);
}
};
- template <TypeKind columnKind, bool isLittleEndian>
- DoubleColumnReader<columnKind, isLittleEndian>::DoubleColumnReader(const Type& type,
- StripeStreams& stripe)
+ template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
+ DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::DoubleColumnReader(
+ const Type& type, StripeStreams& stripe)
: ColumnReader(type, stripe), bufferPointer(nullptr), bufferEnd(nullptr) {
inputStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (inputStream == nullptr) throw ParseError("DATA stream not found in Double column");
}
- template <TypeKind columnKind, bool isLittleEndian>
- uint64_t DoubleColumnReader<columnKind, isLittleEndian>::skip(uint64_t numValues) {
+ template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
+ uint64_t DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::skip(
+ uint64_t numValues) {
numValues = ColumnReader::skip(numValues);
if (static_cast<size_t>(bufferEnd - bufferPointer) >= bytesPerValue * numValues) {
@@ -474,31 +463,32 @@ namespace orc {
return numValues;
}
- template <TypeKind columnKind, bool isLittleEndian>
- void DoubleColumnReader<columnKind, isLittleEndian>::next(ColumnVectorBatch& rowBatch,
- uint64_t numValues, char* notNull) {
+ template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
+ void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::next(
+ ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) {
ColumnReader::next(rowBatch, numValues, notNull);
// update the notNull from the parent class
notNull = rowBatch.hasNulls ? rowBatch.notNull.data() : nullptr;
- double* outArray = dynamic_cast<DoubleVectorBatch&>(rowBatch).data.data();
+ ValueType* outArray =
+ reinterpret_cast<ValueType*>(dynamic_cast<BatchType&>(rowBatch).data.data());
- if (columnKind == FLOAT) {
+ if constexpr (columnKind == FLOAT) {
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
- outArray[i] = readFloat();
+ outArray[i] = readFloat<ValueType>();
}
}
} else {
for (size_t i = 0; i < numValues; ++i) {
- outArray[i] = readFloat();
+ outArray[i] = readFloat<ValueType>();
}
}
} else {
if (notNull) {
for (size_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
- outArray[i] = readDouble();
+ outArray[i] = readDouble<ValueType>();
}
}
} else {
@@ -513,14 +503,14 @@ namespace orc {
bufferPointer += bufferBytes;
}
for (size_t i = bufferNum; i < numValues; ++i) {
- outArray[i] = readDouble();
+ outArray[i] = readDouble<ValueType>();
}
}
}
}
- template <TypeKind columnKind, bool isLittleEndian>
- void DoubleColumnReader<columnKind, isLittleEndian>::seekToRowGroup(
+ template <TypeKind columnKind, bool isLittleEndian, typename ValueType, typename BatchType>
+ void DoubleColumnReader<columnKind, isLittleEndian, ValueType, BatchType>::seekToRowGroup(
std::unordered_map<uint64_t, PositionProvider>& positions) {
ColumnReader::seekToRowGroup(positions);
inputStream->seek(positions.at(columnId));
@@ -830,7 +820,7 @@ namespace orc {
std::vector<std::unique_ptr<ColumnReader>> children;
public:
- StructColumnReader(const Type& type, StripeStreams& stipe);
+ StructColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false);
uint64_t skip(uint64_t numValues) override;
@@ -845,7 +835,8 @@ namespace orc {
void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
};
- StructColumnReader::StructColumnReader(const Type& type, StripeStreams& stripe)
+ StructColumnReader::StructColumnReader(const Type& type, StripeStreams& stripe,
+ bool useTightNumericVector)
: ColumnReader(type, stripe) {
// count the number of selected sub-columns
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
@@ -854,7 +845,7 @@ namespace orc {
for (unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
const Type& child = *type.getSubtype(i);
if (selectedColumns[static_cast<uint64_t>(child.getColumnId())]) {
- children.push_back(buildReader(child, stripe));
+ children.push_back(buildReader(child, stripe, useTightNumericVector));
}
}
break;
@@ -914,7 +905,7 @@ namespace orc {
std::unique_ptr<RleDecoder> rle;
public:
- ListColumnReader(const Type& type, StripeStreams& stipe);
+ ListColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false);
~ListColumnReader() override;
uint64_t skip(uint64_t numValues) override;
@@ -930,7 +921,8 @@ namespace orc {
void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
};
- ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe)
+ ListColumnReader::ListColumnReader(const Type& type, StripeStreams& stripe,
+ bool useTightNumericVector)
: ColumnReader(type, stripe) {
// count the number of selected sub-columns
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
@@ -941,7 +933,7 @@ namespace orc {
rle = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
const Type& childType = *type.getSubtype(0);
if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
- child = buildReader(childType, stripe);
+ child = buildReader(childType, stripe, useTightNumericVector);
}
}
@@ -1033,7 +1025,7 @@ namespace orc {
std::unique_ptr<RleDecoder> rle;
public:
- MapColumnReader(const Type& type, StripeStreams& stipe);
+ MapColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false);
~MapColumnReader() override;
uint64_t skip(uint64_t numValues) override;
@@ -1049,7 +1041,8 @@ namespace orc {
void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
};
- MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe)
+ MapColumnReader::MapColumnReader(const Type& type, StripeStreams& stripe,
+ bool useTightNumericVector)
: ColumnReader(type, stripe) {
// Determine if the key and/or value columns are selected
const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
@@ -1060,11 +1053,11 @@ namespace orc {
rle = createRleDecoder(std::move(stream), false, vers, memoryPool, metrics);
const Type& keyType = *type.getSubtype(0);
if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
- keyReader = buildReader(keyType, stripe);
+ keyReader = buildReader(keyType, stripe, useTightNumericVector);
}
const Type& elementType = *type.getSubtype(1);
if (selectedColumns[static_cast<uint64_t>(elementType.getColumnId())]) {
- elementReader = buildReader(elementType, stripe);
+ elementReader = buildReader(elementType, stripe, useTightNumericVector);
}
}
@@ -1174,7 +1167,7 @@ namespace orc {
uint64_t numChildren;
public:
- UnionColumnReader(const Type& type, StripeStreams& stipe);
+ UnionColumnReader(const Type& type, StripeStreams& stipe, bool useTightNumericVector = false);
uint64_t skip(uint64_t numValues) override;
@@ -1189,7 +1182,8 @@ namespace orc {
void nextInternal(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull);
};
- UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe)
+ UnionColumnReader::UnionColumnReader(const Type& type, StripeStreams& stripe,
+ bool useTightNumericVector)
: ColumnReader(type, stripe) {
numChildren = type.getSubtypeCount();
childrenReader.resize(numChildren);
@@ -1204,7 +1198,7 @@ namespace orc {
for (unsigned int i = 0; i < numChildren; ++i) {
const Type& child = *type.getSubtype(i);
if (selectedColumns[static_cast<size_t>(child.getColumnId())]) {
- childrenReader[i] = buildReader(child, stripe);
+ childrenReader[i] = buildReader(child, stripe, useTightNumericVector);
}
}
}
@@ -1696,13 +1690,25 @@ namespace orc {
/**
* Create a reader for the given stripe.
*/
- std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe) {
+ std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe,
+ bool useTightNumericVector) {
switch (static_cast<int64_t>(type.getKind())) {
- case DATE:
- case INT:
+ case SHORT: {
+ if (useTightNumericVector) {
+ return std::unique_ptr<ColumnReader>(
+ new IntegerColumnReader<ShortVectorBatch>(type, stripe));
+ }
+ }
+ case INT: {
+ if (useTightNumericVector) {
+ return std::unique_ptr<ColumnReader>(
+ new IntegerColumnReader<IntVectorBatch>(type, stripe));
+ }
+ }
case LONG:
- case SHORT:
- return std::unique_ptr<ColumnReader>(new IntegerColumnReader(type, stripe));
+ case DATE:
+ return std::unique_ptr<ColumnReader>(
+ new IntegerColumnReader<LongVectorBatch>(type, stripe));
case BINARY:
case CHAR:
case STRING:
@@ -1722,32 +1728,51 @@ namespace orc {
return std::unique_ptr<ColumnReader>(new BooleanColumnReader(type, stripe));
case BYTE:
- return std::unique_ptr<ColumnReader>(new ByteColumnReader(type, stripe));
+ if (useTightNumericVector) {
+ return std::unique_ptr<ColumnReader>(new ByteColumnReader<ByteVectorBatch>(type, stripe));
+ }
+ return std::unique_ptr<ColumnReader>(new ByteColumnReader<LongVectorBatch>(type, stripe));
case LIST:
- return std::unique_ptr<ColumnReader>(new ListColumnReader(type, stripe));
+ return std::unique_ptr<ColumnReader>(
+ new ListColumnReader(type, stripe, useTightNumericVector));
case MAP:
- return std::unique_ptr<ColumnReader>(new MapColumnReader(type, stripe));
+ return std::unique_ptr<ColumnReader>(
+ new MapColumnReader(type, stripe, useTightNumericVector));
case UNION:
- return std::unique_ptr<ColumnReader>(new UnionColumnReader(type, stripe));
+ return std::unique_ptr<ColumnReader>(
+ new UnionColumnReader(type, stripe, useTightNumericVector));
case STRUCT:
- return std::unique_ptr<ColumnReader>(new StructColumnReader(type, stripe));
-
- case FLOAT:
+ return std::unique_ptr<ColumnReader>(
+ new StructColumnReader(type, stripe, useTightNumericVector));
+
+ case FLOAT: {
+ if (useTightNumericVector) {
+ if (isLittleEndian()) {
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<FLOAT, true, float, FloatVectorBatch>(type, stripe));
+ }
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<FLOAT, false, float, FloatVectorBatch>(type, stripe));
+ }
if (isLittleEndian()) {
- return std::unique_ptr<ColumnReader>(new DoubleColumnReader<FLOAT, true>(type, stripe));
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<FLOAT, true, double, DoubleVectorBatch>(type, stripe));
}
- return std::unique_ptr<ColumnReader>(new DoubleColumnReader<FLOAT, false>(type, stripe));
-
- case DOUBLE:
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<FLOAT, false, double, DoubleVectorBatch>(type, stripe));
+ }
+ case DOUBLE: {
if (isLittleEndian()) {
- return std::unique_ptr<ColumnReader>(new DoubleColumnReader<DOUBLE, true>(type, stripe));
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<DOUBLE, true, double, DoubleVectorBatch>(type, stripe));
}
- return std::unique_ptr<ColumnReader>(new DoubleColumnReader<DOUBLE, false>(type, stripe));
-
+ return std::unique_ptr<ColumnReader>(
+ new DoubleColumnReader<DOUBLE, false, double, DoubleVectorBatch>(type, stripe));
+ }
case TIMESTAMP:
return std::unique_ptr<ColumnReader>(new TimestampColumnReader(type, stripe, false));
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 10b0bb6dc..3b765cbe5 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -158,7 +158,8 @@ namespace orc {
/**
* Create a reader for the given stripe.
*/
- std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe);
+ std::unique_ptr<ColumnReader> buildReader(const Type& type, StripeStreams& stripe,
+ bool useTightNumericVector = false);
} // namespace orc
#endif
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index a70d0bf79..b622bad2d 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -415,6 +415,7 @@ namespace orc {
}
}
+ template <typename BatchType>
class IntegerColumnWriter : public ColumnWriter {
public:
IntegerColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -438,8 +439,10 @@ namespace orc {
RleVersion rleVersion;
};
- IntegerColumnWriter::IntegerColumnWriter(const Type& type, const StreamsFactory& factory,
- const WriterOptions& options)
+ template <typename BatchType>
+ IntegerColumnWriter<BatchType>::IntegerColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options)
: ColumnWriter(type, factory, options), rleVersion(options.getRleVersion()) {
std::unique_ptr<BufferedOutputStream> dataStream =
factory.createStream(proto::Stream_Kind_DATA);
@@ -451,11 +454,12 @@ namespace orc {
}
}
- void IntegerColumnWriter::add(ColumnVectorBatch& rowBatch, uint64_t offset, uint64_t numValues,
- const char* incomingMask) {
- const LongVectorBatch* longBatch = dynamic_cast<const LongVectorBatch*>(&rowBatch);
- if (longBatch == nullptr) {
- throw InvalidArgument("Failed to cast to LongVectorBatch");
+ template <typename BatchType>
+ void IntegerColumnWriter<BatchType>::add(ColumnVectorBatch& rowBatch, uint64_t offset,
+ uint64_t numValues, const char* incomingMask) {
+ const BatchType* intBatch = dynamic_cast<const BatchType*>(&rowBatch);
+ if (intBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to IntegerVectorBatch");
}
IntegerColumnStatisticsImpl* intStats =
dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
@@ -465,8 +469,8 @@ namespace orc {
ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
- const int64_t* data = longBatch->data.data() + offset;
- const char* notNull = longBatch->hasNulls ? longBatch->notNull.data() + offset : nullptr;
+ const auto* data = intBatch->data.data() + offset;
+ const char* notNull = intBatch->hasNulls ? intBatch->notNull.data() + offset : nullptr;
rleEncoder->add(data, numValues, notNull);
@@ -476,9 +480,9 @@ namespace orc {
if (notNull == nullptr || notNull[i]) {
++count;
if (enableBloomFilter) {
- bloomFilter->addLong(data[i]);
+ bloomFilter->addLong(static_cast<int64_t>(data[i]));
}
- intStats->update(data[i], 1);
+ intStats->update(static_cast<int64_t>(data[i]), 1);
}
}
intStats->increase(count);
@@ -487,7 +491,8 @@ namespace orc {
}
}
- void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ template <typename BatchType>
+ void IntegerColumnWriter<BatchType>::flush(std::vector<proto::Stream>& streams) {
ColumnWriter::flush(streams);
proto::Stream stream;
@@ -497,13 +502,16 @@ namespace orc {
streams.push_back(stream);
}
- uint64_t IntegerColumnWriter::getEstimatedSize() const {
+ template <typename BatchType>
+ uint64_t IntegerColumnWriter<BatchType>::getEstimatedSize() const {
uint64_t size = ColumnWriter::getEstimatedSize();
size += rleEncoder->getBufferSize();
return size;
}
- void IntegerColumnWriter::getColumnEncoding(std::vector<proto::ColumnEncoding>& encodings) const {
+ template <typename BatchType>
+ void IntegerColumnWriter<BatchType>::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
proto::ColumnEncoding encoding;
encoding.set_kind(RleVersionMapper(rleVersion));
encoding.set_dictionarysize(0);
@@ -513,11 +521,13 @@ namespace orc {
encodings.push_back(encoding);
}
- void IntegerColumnWriter::recordPosition() const {
+ template <typename BatchType>
+ void IntegerColumnWriter<BatchType>::recordPosition() const {
ColumnWriter::recordPosition();
rleEncoder->recordPosition(rowIndexPosition.get());
}
+ template <typename BatchType>
class ByteColumnWriter : public ColumnWriter {
public:
ByteColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
@@ -537,8 +547,9 @@ namespace orc {
std::unique_ptr<ByteRleEncoder> byteRleEncoder;
};
- ByteColumnWriter::ByteColumnWriter(const Type& type, const StreamsFactory& factory,
- const WriterOptions& options)
+ template <typename BatchType>
+ ByteColumnWriter<BatchType>::ByteColumnWriter(const Type& type, const StreamsFactory& factory,
+ const WriterOptions& options)
: ColumnWriter(type, factory, options) {
std::unique_ptr<BufferedOutputStream> dataStream =
factory.createStream(proto::Stream_Kind_DATA);
@@ -549,11 +560,12 @@ namespace orc {
}
}
- void ByteColumnWriter::add(ColumnVectorBatch& rowBatch, uint64_t offset, uint64_t numValues,
- const char* incomingMask) {
- LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
+ template <typename BatchType>
+ void ByteColumnWriter<BatchType>::add(ColumnVectorBatch& rowBatch, uint64_t offset,
+ uint64_t numValues, const char* incomingMask) {
+ BatchType* byteBatch = dynamic_cast<BatchType*>(&rowBatch);
if (byteBatch == nullptr) {
- throw InvalidArgument("Failed to cast to LongVectorBatch");
+ throw InvalidArgument("Failed to cast to IntegerVectorBatch");
}
IntegerColumnStatisticsImpl* intStats =
dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
@@ -563,7 +575,7 @@ namespace orc {
ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
- int64_t* data = byteBatch->data.data() + offset;
+ auto* data = byteBatch->data.data() + offset;
const char* notNull = byteBatch->hasNulls ? byteBatch->notNull.data() + offset : nullptr;
char* byteData = reinterpret_cast<char*>(data);
@@ -588,7 +600,8 @@ namespace orc {
}
}
- void ByteColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ template <typename BatchType>
+ void ByteColumnWriter<BatchType>::flush(std::vector<proto::Stream>& streams) {
ColumnWriter::flush(streams);
proto::Stream stream;
@@ -598,13 +611,16 @@ namespace orc {
streams.push_back(stream);
}
- uint64_t ByteColumnWriter::getEstimatedSize() const {
+ template <typename BatchType>
+ uint64_t ByteColumnWriter<BatchType>::getEstimatedSize() const {
uint64_t size = ColumnWriter::getEstimatedSize();
size += byteRleEncoder->getBufferSize();
return size;
}
- void ByteColumnWriter::getColumnEncoding(std::vector<proto::ColumnEncoding>& encodings) const {
+ template <typename BatchType>
+ void ByteColumnWriter<BatchType>::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
proto::ColumnEncoding encoding;
encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
encoding.set_dictionarysize(0);
@@ -614,7 +630,8 @@ namespace orc {
encodings.push_back(encoding);
}
- void ByteColumnWriter::recordPosition() const {
+ template <typename BatchType>
+ void ByteColumnWriter<BatchType>::recordPosition() const {
ColumnWriter::recordPosition();
byteRleEncoder->recordPosition(rowIndexPosition.get());
}
@@ -721,10 +738,11 @@ namespace orc {
rleEncoder->recordPosition(rowIndexPosition.get());
}
- class DoubleColumnWriter : public ColumnWriter {
+ template <typename ValueType, typename BatchType>
+ class FloatingColumnWriter : public ColumnWriter {
public:
- DoubleColumnWriter(const Type& type, const StreamsFactory& factory,
- const WriterOptions& options, bool isFloat);
+ FloatingColumnWriter(const Type& type, const StreamsFactory& factory,
+ const WriterOptions& options, bool isFloat);
virtual void add(ColumnVectorBatch& rowBatch, uint64_t offset, uint64_t numValues,
const char* incomingMask) override;
@@ -743,8 +761,11 @@ namespace orc {
DataBuffer<char> buffer;
};
- DoubleColumnWriter::DoubleColumnWriter(const Type& type, const StreamsFactory& factory,
- const WriterOptions& options, bool isFloatType)
+ template <typename ValueType, typename BatchType>
+ FloatingColumnWriter<ValueType, BatchType>::FloatingColumnWriter(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options,
+ bool isFloatType)
: ColumnWriter(type, factory, options),
isFloat(isFloatType),
buffer(*options.getMemoryPool()) {
@@ -766,11 +787,13 @@ namespace orc {
}
}
- void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch, uint64_t offset, uint64_t numValues,
- const char* incomingMask) {
- const DoubleVectorBatch* dblBatch = dynamic_cast<const DoubleVectorBatch*>(&rowBatch);
+ template <typename ValueType, typename BatchType>
+ void FloatingColumnWriter<ValueType, BatchType>::add(ColumnVectorBatch& rowBatch, uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const BatchType* dblBatch = dynamic_cast<const BatchType*>(&rowBatch);
if (dblBatch == nullptr) {
- throw InvalidArgument("Failed to cast to DoubleVectorBatch");
+ throw InvalidArgument("Failed to cast to FloatingVectorBatch");
}
DoubleColumnStatisticsImpl* doubleStats =
dynamic_cast<DoubleColumnStatisticsImpl*>(colIndexStatistics.get());
@@ -780,7 +803,7 @@ namespace orc {
ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
- const double* doubleData = dblBatch->data.data() + offset;
+ const ValueType* doubleData = dblBatch->data.data() + offset;
const char* notNull = dblBatch->hasNulls ? dblBatch->notNull.data() + offset : nullptr;
size_t bytes = isFloat ? 4 : 8;
@@ -791,14 +814,14 @@ namespace orc {
if (isFloat) {
encodeFloatNum<float, int32_t>(static_cast<float>(doubleData[i]), data);
} else {
- encodeFloatNum<double, int64_t>(doubleData[i], data);
+ encodeFloatNum<double, int64_t>(static_cast<double>(doubleData[i]), data);
}
dataStream->write(data, bytes);
++count;
if (enableBloomFilter) {
- bloomFilter->addDouble(doubleData[i]);
+ bloomFilter->addDouble(static_cast<double>(doubleData[i]));
}
- doubleStats->update(doubleData[i]);
+ doubleStats->update(static_cast<double>(doubleData[i]));
}
}
doubleStats->increase(count);
@@ -807,7 +830,8 @@ namespace orc {
}
}
- void DoubleColumnWriter::flush(std::vector<proto::Stream>& streams) {
+ template <typename ValueType, typename BatchType>
+ void FloatingColumnWriter<ValueType, BatchType>::flush(std::vector<proto::Stream>& streams) {
ColumnWriter::flush(streams);
proto::Stream stream;
@@ -817,13 +841,16 @@ namespace orc {
streams.push_back(stream);
}
- uint64_t DoubleColumnWriter::getEstimatedSize() const {
+ template <typename ValueType, typename BatchType>
+ uint64_t FloatingColumnWriter<ValueType, BatchType>::getEstimatedSize() const {
uint64_t size = ColumnWriter::getEstimatedSize();
size += dataStream->getSize();
return size;
}
- void DoubleColumnWriter::getColumnEncoding(std::vector<proto::ColumnEncoding>& encodings) const {
+ template <typename ValueType, typename BatchType>
+ void FloatingColumnWriter<ValueType, BatchType>::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
proto::ColumnEncoding encoding;
encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
encoding.set_dictionarysize(0);
@@ -833,7 +860,8 @@ namespace orc {
encodings.push_back(encoding);
}
- void DoubleColumnWriter::recordPosition() const {
+ template <typename ValueType, typename BatchType>
+ void FloatingColumnWriter<ValueType, BatchType>::recordPosition() const {
ColumnWriter::recordPosition();
dataStream->recordPosition(rowIndexPosition.get());
}
@@ -1740,7 +1768,7 @@ namespace orc {
nanoRleEncoder->recordPosition(rowIndexPosition.get());
}
- class DateColumnWriter : public IntegerColumnWriter {
+ class DateColumnWriter : public IntegerColumnWriter<LongVectorBatch> {
public:
DateColumnWriter(const Type& type, const StreamsFactory& factory, const WriterOptions& options);
@@ -1750,7 +1778,7 @@ namespace orc {
DateColumnWriter::DateColumnWriter(const Type& type, const StreamsFactory& factory,
const WriterOptions& options)
- : IntegerColumnWriter(type, factory, options) {
+ : IntegerColumnWriter<LongVectorBatch>(type, factory, options) {
// PASS
}
@@ -2782,18 +2810,38 @@ namespace orc {
switch (static_cast<int64_t>(type.getKind())) {
case STRUCT:
return std::unique_ptr<ColumnWriter>(new StructColumnWriter(type, factory, options));
+ case SHORT:
+ if (options.getUseTightNumericVector()) {
+ return std::unique_ptr<ColumnWriter>(
+ new IntegerColumnWriter<ShortVectorBatch>(type, factory, options));
+ }
case INT:
+ if (options.getUseTightNumericVector()) {
+ return std::unique_ptr<ColumnWriter>(
+ new IntegerColumnWriter<IntVectorBatch>(type, factory, options));
+ }
case LONG:
- case SHORT:
- return std::unique_ptr<ColumnWriter>(new IntegerColumnWriter(type, factory, options));
+ return std::unique_ptr<ColumnWriter>(
+ new IntegerColumnWriter<LongVectorBatch>(type, factory, options));
case BYTE:
- return std::unique_ptr<ColumnWriter>(new ByteColumnWriter(type, factory, options));
+ if (options.getUseTightNumericVector()) {
+ return std::unique_ptr<ColumnWriter>(
+ new ByteColumnWriter<ByteVectorBatch>(type, factory, options));
+ }
+ return std::unique_ptr<ColumnWriter>(
+ new ByteColumnWriter<LongVectorBatch>(type, factory, options));
case BOOLEAN:
return std::unique_ptr<ColumnWriter>(new BooleanColumnWriter(type, factory, options));
case DOUBLE:
- return std::unique_ptr<ColumnWriter>(new DoubleColumnWriter(type, factory, options, false));
+ return std::unique_ptr<ColumnWriter>(
+ new FloatingColumnWriter<double, DoubleVectorBatch>(type, factory, options, false));
case FLOAT:
- return std::unique_ptr<ColumnWriter>(new DoubleColumnWriter(type, factory, options, true));
+ if (options.getUseTightNumericVector()) {
+ return std::unique_ptr<ColumnWriter>(
+ new FloatingColumnWriter<float, FloatVectorBatch>(type, factory, options, true));
+ }
+ return std::unique_ptr<ColumnWriter>(
+ new FloatingColumnWriter<double, DoubleVectorBatch>(type, factory, options, true));
case BINARY:
return std::unique_ptr<ColumnWriter>(new BinaryColumnWriter(type, factory, options));
case STRING:
diff --git a/c++/src/MemoryPool.cc b/c++/src/MemoryPool.cc
index 89a9dcc98..bd2a30892 100644
--- a/c++/src/MemoryPool.cc
+++ b/c++/src/MemoryPool.cc
@@ -162,6 +162,24 @@ namespace orc {
currentSize = newSize;
}
+ // Specializations for float
+
+ template <>
+ DataBuffer<float>::~DataBuffer() {
+ if (buf) {
+ memoryPool.free(reinterpret_cast<char*>(buf));
+ }
+ }
+
+ template <>
+ void DataBuffer<float>::resize(uint64_t newSize) {
+ reserve(newSize);
+ if (newSize > currentSize) {
+ memset(buf + currentSize, 0, (newSize - currentSize) * sizeof(float));
+ }
+ currentSize = newSize;
+ }
+
// Specializations for int64_t
template <>
@@ -180,6 +198,60 @@ namespace orc {
currentSize = newSize;
}
+ // Specializations for int32_t
+
+ template <>
+ DataBuffer<int32_t>::~DataBuffer() {
+ if (buf) {
+ memoryPool.free(reinterpret_cast<char*>(buf));
+ }
+ }
+
+ template <>
+ void DataBuffer<int32_t>::resize(uint64_t newSize) {
+ reserve(newSize);
+ if (newSize > currentSize) {
+ memset(buf + currentSize, 0, (newSize - currentSize) * sizeof(int32_t));
+ }
+ currentSize = newSize;
+ }
+
+ // Specializations for int16_t
+
+ template <>
+ DataBuffer<int16_t>::~DataBuffer() {
+ if (buf) {
+ memoryPool.free(reinterpret_cast<char*>(buf));
+ }
+ }
+
+ template <>
+ void DataBuffer<int16_t>::resize(uint64_t newSize) {
+ reserve(newSize);
+ if (newSize > currentSize) {
+ memset(buf + currentSize, 0, (newSize - currentSize) * sizeof(int16_t));
+ }
+ currentSize = newSize;
+ }
+
+ // Specializations for int8_t
+
+ template <>
+ DataBuffer<int8_t>::~DataBuffer() {
+ if (buf) {
+ memoryPool.free(reinterpret_cast<char*>(buf));
+ }
+ }
+
+ template <>
+ void DataBuffer<int8_t>::resize(uint64_t newSize) {
+ reserve(newSize);
+ if (newSize > currentSize) {
+ memset(buf + currentSize, 0, (newSize - currentSize) * sizeof(int8_t));
+ }
+ currentSize = newSize;
+ }
+
// Specializations for uint64_t
template <>
@@ -223,8 +295,12 @@ namespace orc {
template class DataBuffer<char>;
template class DataBuffer<char*>;
template class DataBuffer<double>;
+ template class DataBuffer<float>;
template class DataBuffer<Int128>;
template class DataBuffer<int64_t>;
+ template class DataBuffer<int32_t>;
+ template class DataBuffer<int16_t>;
+ template class DataBuffer<int8_t>;
template class DataBuffer<uint64_t>;
template class DataBuffer<unsigned char>;
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index 6897c2663..2d410d4fb 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -140,6 +140,7 @@ namespace orc {
std::shared_ptr<SearchArgument> sargs;
std::string readerTimezone;
RowReaderOptions::IdReadIntentMap idReadIntentMap;
+ bool useTightNumericVector;
RowReaderOptionsPrivate() {
selection = ColumnSelection_NONE;
@@ -149,6 +150,7 @@ namespace orc {
forcedScaleOnHive11Decimal = 6;
enableLazyDecoding = false;
readerTimezone = "GMT";
+ useTightNumericVector = false;
}
};
@@ -298,6 +300,15 @@ namespace orc {
const RowReaderOptions::IdReadIntentMap RowReaderOptions::getIdReadIntentMap() const {
return privateBits->idReadIntentMap;
}
+
+ RowReaderOptions& RowReaderOptions::setUseTightNumericVector(bool useTightNumericVector) {
+ privateBits->useTightNumericVector = useTightNumericVector;
+ return *this;
+ }
+
+ bool RowReaderOptions::getUseTightNumericVector() const {
+ return privateBits->useTightNumericVector;
+ }
} // namespace orc
#endif
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index 506f6eef9..535281d6f 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -60,14 +60,27 @@ namespace orc {
}
}
- void RleEncoder::add(const int64_t* data, uint64_t numValues, const char* notNull) {
+ template <typename T>
+ void RleEncoder::add(const T* data, uint64_t numValues, const char* notNull) {
for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
- write(data[i]);
+ write(static_cast<int64_t>(data[i]));
}
}
}
+ void RleEncoder::add(const int64_t* data, uint64_t numValues, const char* notNull) {
+ add<int64_t>(data, numValues, notNull);
+ }
+
+ void RleEncoder::add(const int32_t* data, uint64_t numValues, const char* notNull) {
+ add<int32_t>(data, numValues, notNull);
+ }
+
+ void RleEncoder::add(const int16_t* data, uint64_t numValues, const char* notNull) {
+ add<int16_t>(data, numValues, notNull);
+ }
+
void RleEncoder::writeVslong(int64_t val) {
writeVulong((val << 1) ^ (val >> 63));
}
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index 68a954c27..51f9b6f58 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -56,8 +56,14 @@ namespace orc {
* @param notNull If the pointer is null, all values are read. If the
* pointer is not null, positions that are false are skipped.
*/
+ template <typename T>
+ void add(const T* data, uint64_t numValues, const char* notNull);
+
virtual void add(const int64_t* data, uint64_t numValues, const char* notNull);
+ virtual void add(const int32_t* data, uint64_t numValues, const char* notNull);
+
+ virtual void add(const int16_t* data, uint64_t numValues, const char* notNull);
/**
* Get size of buffer used so far.
*/
@@ -122,6 +128,10 @@ namespace orc {
*/
virtual void next(int64_t* data, uint64_t numValues, const char* notNull) = 0;
+ virtual void next(int32_t* data, uint64_t numValues, const char* notNull) = 0;
+
+ virtual void next(int16_t* data, uint64_t numValues, const char* notNull) = 0;
+
protected:
ReaderMetrics* metrics;
};
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index d45bc4da5..f8431566c 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -227,8 +227,8 @@ namespace orc {
}
}
- void RleDecoderV1::next(int64_t* const data, const uint64_t numValues,
- const char* const notNull) {
+ template <typename T>
+ void RleDecoderV1::next(T* const data, const uint64_t numValues, const char* const notNull) {
SCOPED_STOPWATCH(metrics, DecodingLatencyUs, DecodingCall);
uint64_t position = 0;
// skipNulls()
@@ -250,13 +250,13 @@ namespace orc {
if (notNull) {
for (uint64_t i = 0; i < count; ++i) {
if (notNull[position + i]) {
- data[position + i] = value + static_cast<int64_t>(consumed) * delta;
+ data[position + i] = static_cast<T>(value + static_cast<int64_t>(consumed) * delta);
consumed += 1;
}
}
} else {
for (uint64_t i = 0; i < count; ++i) {
- data[position + i] = value + static_cast<int64_t>(i) * delta;
+ data[position + i] = static_cast<T>(value + static_cast<int64_t>(i) * delta);
}
consumed = count;
}
@@ -266,18 +266,18 @@ namespace orc {
for (uint64_t i = 0; i < count; ++i) {
if (notNull[position + i]) {
data[position + i] =
- isSigned ? unZigZag(readLong()) : static_cast<int64_t>(readLong());
+ isSigned ? static_cast<T>(unZigZag(readLong())) : static_cast<T>(readLong());
++consumed;
}
}
} else {
if (isSigned) {
for (uint64_t i = 0; i < count; ++i) {
- data[position + i] = unZigZag(readLong());
+ data[position + i] = static_cast<T>(unZigZag(readLong()));
}
} else {
for (uint64_t i = 0; i < count; ++i) {
- data[position + i] = static_cast<int64_t>(readLong());
+ data[position + i] = static_cast<T>(readLong());
}
}
consumed = count;
@@ -296,4 +296,15 @@ namespace orc {
}
}
+ void RleDecoderV1::next(int64_t* data, uint64_t numValues, const char* notNull) {
+ next<int64_t>(data, numValues, notNull);
+ }
+
+ void RleDecoderV1::next(int32_t* data, uint64_t numValues, const char* notNull) {
+ next<int32_t>(data, numValues, notNull);
+ }
+
+ void RleDecoderV1::next(int16_t* data, uint64_t numValues, const char* notNull) {
+ next<int16_t>(data, numValues, notNull);
+ }
} // namespace orc
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index d631556db..fbe6b0f9c 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -63,8 +63,15 @@ namespace orc {
/**
* Read a number of values into the batch.
*/
+ template <typename T>
+ void next(T* data, uint64_t numValues, const char* notNull);
+
void next(int64_t* data, uint64_t numValues, const char* notNull) override;
+ void next(int32_t* data, uint64_t numValues, const char* notNull) override;
+
+ void next(int16_t* data, uint64_t numValues, const char* notNull) override;
+
private:
inline signed char readByte();
diff --git a/c++/src/RLEv2.hh b/c++/src/RLEv2.hh
index 4180dba89..f48ce8391 100644
--- a/c++/src/RLEv2.hh
+++ b/c++/src/RLEv2.hh
@@ -157,8 +157,15 @@ namespace orc {
/**
* Read a number of values into the batch.
*/
+ template <typename T>
+ void next(T* data, uint64_t numValues, const char* notNull);
+
void next(int64_t* data, uint64_t numValues, const char* notNull) override;
+ void next(int32_t* data, uint64_t numValues, const char* notNull) override;
+
+ void next(int16_t* data, uint64_t numValues, const char* notNull) override;
+
private:
/**
* Decode the next gap and patch from 'unpackedPatch' and update the index on it.
@@ -200,14 +207,16 @@ namespace orc {
void unrolledUnpack56(int64_t* data, uint64_t offset, uint64_t len);
void unrolledUnpack64(int64_t* data, uint64_t offset, uint64_t len);
- uint64_t nextShortRepeats(int64_t* data, uint64_t offset, uint64_t numValues,
- const char* notNull);
- uint64_t nextDirect(int64_t* data, uint64_t offset, uint64_t numValues, const char* notNull);
- uint64_t nextPatched(int64_t* data, uint64_t offset, uint64_t numValues, const char* notNull);
- uint64_t nextDelta(int64_t* data, uint64_t offset, uint64_t numValues, const char* notNull);
-
- uint64_t copyDataFromBuffer(int64_t* data, uint64_t offset, uint64_t numValues,
- const char* notNull);
+ template <typename T>
+ uint64_t nextShortRepeats(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
+ template <typename T>
+ uint64_t nextDirect(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
+ template <typename T>
+ uint64_t nextPatched(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
+ template <typename T>
+ uint64_t nextDelta(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
+ template <typename T>
+ uint64_t copyDataFromBuffer(T* data, uint64_t offset, uint64_t numValues, const char* notNull);
const std::unique_ptr<SeekableInputStream> inputStream;
const bool isSigned;
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index dab3ceec6..466cfd433 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -263,6 +263,7 @@ namespace orc {
currentRowInStripe = 0;
rowsInCurrentStripe = 0;
numRowGroupsInStripeRange = 0;
+ useTightNumericVector = opts.getUseTightNumericVector();
uint64_t rowTotal = 0;
firstRowOfStripe.resize(numberOfStripes);
@@ -1090,7 +1091,7 @@ namespace orc {
StripeStreamsImpl stripeStreams(*this, currentStripe, currentStripeInfo, currentStripeFooter,
currentStripeInfo.offset(), *contents->stream, writerTimezone,
readerTimezone);
- reader = buildReader(*contents->schema, stripeStreams);
+ reader = buildReader(*contents->schema, stripeStreams, useTightNumericVector);
if (sargsApplier) {
// move to the 1st selected row group when PPD is enabled.
@@ -1204,7 +1205,8 @@ namespace orc {
}
std::unique_ptr<ColumnVectorBatch> RowReaderImpl::createRowBatch(uint64_t capacity) const {
- return getSelectedType().createRowBatch(capacity, *contents->pool, enableEncodedBlock);
+ return getSelectedType().createRowBatch(capacity, *contents->pool, enableEncodedBlock,
+ useTightNumericVector);
}
void ensureOrcFooter(InputStream* stream, DataBuffer<char>* buffer, uint64_t postscriptLength) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index c7e616bea..ef04edd6c 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -157,6 +157,7 @@ namespace orc {
std::unique_ptr<ColumnReader> reader;
bool enableEncodedBlock;
+ bool useTightNumericVector;
// internal methods
void startNextStripe();
inline void markEndOfFile();
diff --git a/c++/src/RleDecoderV2.cc b/c++/src/RleDecoderV2.cc
index ca5528513..2742aef6f 100644
--- a/c++/src/RleDecoderV2.cc
+++ b/c++/src/RleDecoderV2.cc
@@ -438,8 +438,8 @@ namespace orc {
}
}
- void RleDecoderV2::next(int64_t* const data, const uint64_t numValues,
- const char* const notNull) {
+ template <typename T>
+ void RleDecoderV2::next(T* const data, const uint64_t numValues, const char* const notNull) {
SCOPED_STOPWATCH(metrics, DecodingLatencyUs, DecodingCall);
uint64_t nRead = 0;
@@ -478,7 +478,20 @@ namespace orc {
}
}
- uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data, uint64_t offset, uint64_t numValues,
+ void RleDecoderV2::next(int64_t* data, uint64_t numValues, const char* notNull) {
+ next<int64_t>(data, numValues, notNull);
+ }
+
+ void RleDecoderV2::next(int32_t* data, uint64_t numValues, const char* notNull) {
+ next<int32_t>(data, numValues, notNull);
+ }
+
+ void RleDecoderV2::next(int16_t* data, uint64_t numValues, const char* notNull) {
+ next<int16_t>(data, numValues, notNull);
+ }
+
+ template <typename T>
+ uint64_t RleDecoderV2::nextShortRepeats(T* const data, uint64_t offset, uint64_t numValues,
const char* const notNull) {
if (runRead == runLength) {
// extract the number of fixed bytes
@@ -503,13 +516,13 @@ namespace orc {
if (notNull) {
for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
if (notNull[pos]) {
- data[pos] = literals[0];
+ data[pos] = static_cast<T>(literals[0]);
++runRead;
}
}
} else {
for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
- data[pos] = literals[0];
+ data[pos] = static_cast<T>(literals[0]);
++runRead;
}
}
@@ -517,7 +530,8 @@ namespace orc {
return nRead;
}
- uint64_t RleDecoderV2::nextDirect(int64_t* const data, uint64_t offset, uint64_t numValues,
+ template <typename T>
+ uint64_t RleDecoderV2::nextDirect(T* const data, uint64_t offset, uint64_t numValues,
const char* const notNull) {
if (runRead == runLength) {
// extract the number of fixed bits
@@ -565,7 +579,8 @@ namespace orc {
*patchIdx = idx;
}
- uint64_t RleDecoderV2::nextPatched(int64_t* const data, uint64_t offset, uint64_t numValues,
+ template <typename T>
+ uint64_t RleDecoderV2::nextPatched(T* const data, uint64_t offset, uint64_t numValues,
const char* const notNull) {
if (runRead == runLength) {
// extract the number of fixed bits
@@ -663,7 +678,8 @@ namespace orc {
return copyDataFromBuffer(data, offset, numValues, notNull);
}
- uint64_t RleDecoderV2::nextDelta(int64_t* const data, uint64_t offset, uint64_t numValues,
+ template <typename T>
+ uint64_t RleDecoderV2::nextDelta(T* const data, uint64_t offset, uint64_t numValues,
const char* const notNull) {
if (runRead == runLength) {
// extract the number of fixed bits
@@ -727,18 +743,20 @@ namespace orc {
return copyDataFromBuffer(data, offset, numValues, notNull);
}
- uint64_t RleDecoderV2::copyDataFromBuffer(int64_t* data, uint64_t offset, uint64_t numValues,
+ template <typename T>
+ uint64_t RleDecoderV2::copyDataFromBuffer(T* data, uint64_t offset, uint64_t numValues,
const char* notNull) {
uint64_t nRead = std::min(runLength - runRead, numValues);
if (notNull) {
for (uint64_t i = offset; i < (offset + nRead); ++i) {
if (notNull[i]) {
- data[i] = literals[runRead++];
+ data[i] = static_cast<T>(literals[runRead++]);
}
}
} else {
- memcpy(data + offset, literals.data() + runRead, nRead * sizeof(int64_t));
- runRead += nRead;
+ for (uint64_t i = offset; i < (offset + nRead); ++i) {
+ data[i] = static_cast<T>(literals[runRead++]);
+ }
}
return nRead;
}
diff --git a/c++/src/Timezone.cc b/c++/src/Timezone.cc
index c330978d4..0b007d1f8 100644
--- a/c++/src/Timezone.cc
+++ b/c++/src/Timezone.cc
@@ -24,6 +24,7 @@
#include <stdlib.h>
#include <string.h>
#include <time.h>
+#include <iostream>
#include <map>
#include <sstream>
diff --git a/c++/src/TypeImpl.cc b/c++/src/TypeImpl.cc
index e586eef5c..dd7a0f0be 100644
--- a/c++/src/TypeImpl.cc
+++ b/c++/src/TypeImpl.cc
@@ -279,16 +279,37 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> TypeImpl::createRowBatch(uint64_t capacity,
MemoryPool& memoryPool,
bool encoded) const {
+ return createRowBatch(capacity, memoryPool, encoded, false);
+ }
+
+ std::unique_ptr<ColumnVectorBatch> TypeImpl::createRowBatch(uint64_t capacity,
+ MemoryPool& memoryPool, bool encoded,
+ bool useTightNumericVector) const {
switch (static_cast<int64_t>(kind)) {
case BOOLEAN:
- case BYTE:
- case SHORT:
- case INT:
+ case BYTE: {
+ if (useTightNumericVector) {
+ return std::unique_ptr<ColumnVectorBatch>(new ByteVectorBatch(capacity, memoryPool));
+ }
+ }
+ case SHORT: {
+ if (useTightNumericVector) {
+ return std::unique_ptr<ColumnVectorBatch>(new ShortVectorBatch(capacity, memoryPool));
+ }
+ }
+ case INT: {
+ if (useTightNumericVector) {
+ return std::unique_ptr<ColumnVectorBatch>(new IntVectorBatch(capacity, memoryPool));
+ }
+ }
case LONG:
case DATE:
return std::unique_ptr<ColumnVectorBatch>(new LongVectorBatch(capacity, memoryPool));
case FLOAT:
+ if (useTightNumericVector) {
+ return std::unique_ptr<ColumnVectorBatch>(new FloatVectorBatch(capacity, memoryPool));
+ }
case DOUBLE:
return std::unique_ptr<ColumnVectorBatch>(new DoubleVectorBatch(capacity, memoryPool));
@@ -311,7 +332,9 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch>(result);
for (uint64_t i = 0; i < getSubtypeCount(); ++i) {
result->fields.push_back(
- getSubtype(i)->createRowBatch(capacity, memoryPool, encoded).release());
+ getSubtype(i)
+ ->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector)
+ .release());
}
return return_value;
}
@@ -321,7 +344,8 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> return_value =
std::unique_ptr<ColumnVectorBatch>(result);
if (getSubtype(0) != nullptr) {
- result->elements = getSubtype(0)->createRowBatch(capacity, memoryPool, encoded);
+ result->elements =
+ getSubtype(0)->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector);
}
return return_value;
}
@@ -331,10 +355,12 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> return_value =
std::unique_ptr<ColumnVectorBatch>(result);
if (getSubtype(0) != nullptr) {
- result->keys = getSubtype(0)->createRowBatch(capacity, memoryPool, encoded);
+ result->keys =
+ getSubtype(0)->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector);
}
if (getSubtype(1) != nullptr) {
- result->elements = getSubtype(1)->createRowBatch(capacity, memoryPool, encoded);
+ result->elements =
+ getSubtype(1)->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector);
}
return return_value;
}
@@ -354,7 +380,9 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch>(result);
for (uint64_t i = 0; i < getSubtypeCount(); ++i) {
result->children.push_back(
- getSubtype(i)->createRowBatch(capacity, memoryPool, encoded).release());
+ getSubtype(i)
+ ->createRowBatch(capacity, memoryPool, encoded, useTightNumericVector)
+ .release());
}
return return_value;
}
diff --git a/c++/src/TypeImpl.hh b/c++/src/TypeImpl.hh
index 21b69b433..7165e102b 100644
--- a/c++/src/TypeImpl.hh
+++ b/c++/src/TypeImpl.hh
@@ -94,6 +94,10 @@ namespace orc {
std::unique_ptr<ColumnVectorBatch> createRowBatch(uint64_t size, MemoryPool& memoryPool,
bool encoded = false) const override;
+ std::unique_ptr<ColumnVectorBatch> createRowBatch(
+ uint64_t size, MemoryPool& memoryPool, bool encoded = false,
+ bool useTightNumericVector = false) const override;
+
/**
* Explicitly set the column ids. Only for internal usage.
*/
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index 4ececb070..ef16b3180 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -60,68 +60,6 @@ namespace orc {
return false;
}
- LongVectorBatch::LongVectorBatch(uint64_t _capacity, MemoryPool& pool)
- : ColumnVectorBatch(_capacity, pool), data(pool, _capacity) {
- // PASS
- }
-
- LongVectorBatch::~LongVectorBatch() {
- // PASS
- }
-
- std::string LongVectorBatch::toString() const {
- std::ostringstream buffer;
- buffer << "Long vector <" << numElements << " of " << capacity << ">";
- return buffer.str();
- }
-
- void LongVectorBatch::resize(uint64_t cap) {
- if (capacity < cap) {
- ColumnVectorBatch::resize(cap);
- data.resize(cap);
- }
- }
-
- void LongVectorBatch::clear() {
- numElements = 0;
- }
-
- uint64_t LongVectorBatch::getMemoryUsage() {
- return ColumnVectorBatch::getMemoryUsage() +
- static_cast<uint64_t>(data.capacity() * sizeof(int64_t));
- }
-
- DoubleVectorBatch::DoubleVectorBatch(uint64_t _capacity, MemoryPool& pool)
- : ColumnVectorBatch(_capacity, pool), data(pool, _capacity) {
- // PASS
- }
-
- DoubleVectorBatch::~DoubleVectorBatch() {
- // PASS
- }
-
- std::string DoubleVectorBatch::toString() const {
- std::ostringstream buffer;
- buffer << "Double vector <" << numElements << " of " << capacity << ">";
- return buffer.str();
- }
-
- void DoubleVectorBatch::resize(uint64_t cap) {
- if (capacity < cap) {
- ColumnVectorBatch::resize(cap);
- data.resize(cap);
- }
- }
-
- void DoubleVectorBatch::clear() {
- numElements = 0;
- }
-
- uint64_t DoubleVectorBatch::getMemoryUsage() {
- return ColumnVectorBatch::getMemoryUsage() +
- static_cast<uint64_t>(data.capacity() * sizeof(double));
- }
-
StringDictionary::StringDictionary(MemoryPool& pool)
: dictionaryBlob(pool), dictionaryOffset(pool) {
// PASS
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index c975ecde7..36c9efb36 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -44,6 +44,7 @@ namespace orc {
BloomFilterVersion bloomFilterVersion;
std::string timezone;
WriterMetrics* metrics;
+ bool useTightNumericVector;
WriterOptionsPrivate() : fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12
stripeSize = 64 * 1024 * 1024; // 64M
@@ -63,6 +64,7 @@ namespace orc {
// Explictly set the writer timezone if the use case depends on it.
timezone = "GMT";
metrics = nullptr;
+ useTightNumericVector = false;
}
};
@@ -262,6 +264,15 @@ namespace orc {
return *this;
}
+ WriterOptions& WriterOptions::setUseTightNumericVector(bool useTightNumericVector) {
+ privateBits->useTightNumericVector = useTightNumericVector;
+ return *this;
+ }
+
+ bool WriterOptions::getUseTightNumericVector() const {
+ return privateBits->useTightNumericVector;
+ }
+
Writer::~Writer() {
// PASS
}
@@ -284,6 +295,7 @@ namespace orc {
static const char* magicId;
static const WriterId writerId;
+ bool useTightNumericVector;
public:
WriterImpl(const Type& type, OutputStream* stream, const WriterOptions& options);
@@ -318,6 +330,8 @@ namespace orc {
stripeRows = totalRows = indexRows = 0;
currentOffset = 0;
+ useTightNumericVector = opts.getUseTightNumericVector();
+
// compression stream for stripe footer, file footer and metadata
compressionStream = createCompressor(
options.getCompression(), outStream, options.getCompressionStrategy(),
@@ -334,7 +348,7 @@ namespace orc {
}
std::unique_ptr<ColumnVectorBatch> WriterImpl::createRowBatch(uint64_t size) const {
- return type.createRowBatch(size, *options.getMemoryPool());
+ return type.createRowBatch(size, *options.getMemoryPool(), false, useTightNumericVector);
}
void WriterImpl::add(ColumnVectorBatch& rowsToAdd) {
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index e71c60afb..be35a95eb 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -45,7 +45,8 @@ namespace orc {
CompressionKind compression, const Type& type,
MemoryPool* memoryPool, OutputStream* stream,
FileVersion version, uint64_t stride = 0,
- const std::string& timezone = "GMT") {
+ const std::string& timezone = "GMT",
+ bool useTightNumericVector = false) {
WriterOptions options;
options.setStripeSize(stripeSize);
options.setCompressionBlockSize(compresionblockSize);
@@ -54,6 +55,7 @@ namespace orc {
options.setRowIndexStride(stride);
options.setFileVersion(version);
options.setTimezoneName(timezone);
+ options.setUseTightNumericVector(useTightNumericVector);
return createWriter(type, stream, options);
}
@@ -64,9 +66,11 @@ namespace orc {
return createReader(std::move(stream), options);
}
- std::unique_ptr<RowReader> createRowReader(Reader* reader, const std::string& timezone = "GMT") {
+ std::unique_ptr<RowReader> createRowReader(Reader* reader, const std::string& timezone = "GMT",
+ bool useTightNumericVector = false) {
RowReaderOptions rowReaderOpts;
rowReaderOpts.setTimezoneName(timezone);
+ rowReaderOpts.setUseTightNumericVector(useTightNumericVector);
return reader->createRowReader(rowReaderOpts);
}
@@ -1855,6 +1859,99 @@ namespace orc {
testSuppressPresentStream(CompressionKind_SNAPPY);
}
+ TEST_P(WriterTest, testWriteFixedWidthNumericVectorBatch) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ std::unique_ptr<Type> type(Type::buildTypeFromString(
+ "struct<col1:double,col2:float,col3:int,col4:smallint,col5:tinyint,col6:bigint>"));
+
+ uint64_t stripeSize = 16 * 1024;
+ uint64_t compressionBlockSize = 1024;
+ uint64_t rowCount = 65530;
+
+ std::vector<double> data(rowCount);
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ data[i] = 100000 * (std::rand() * 1.0 / RAND_MAX);
+ }
+
+ std::unique_ptr<Writer> writer =
+ createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool,
+ &memStream, fileVersion, 0, "GMT", true);
+ // start from here/
+ std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount / 2);
+ StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+ DoubleVectorBatch* doubleBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]);
+ FloatVectorBatch* floatBatch = dynamic_cast<FloatVectorBatch*>(structBatch->fields[1]);
+ IntVectorBatch* intBatch = dynamic_cast<IntVectorBatch*>(structBatch->fields[2]);
+ ShortVectorBatch* shortBatch = dynamic_cast<ShortVectorBatch*>(structBatch->fields[3]);
+ ByteVectorBatch* byteBatch = dynamic_cast<ByteVectorBatch*>(structBatch->fields[4]);
+ LongVectorBatch* longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[5]);
+ structBatch->resize(rowCount);
+ doubleBatch->resize(rowCount);
+ floatBatch->resize(rowCount);
+ intBatch->resize(rowCount);
+ shortBatch->resize(rowCount);
+ byteBatch->resize(rowCount);
+ longBatch->resize(rowCount);
+
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ structBatch->notNull[i] = 1;
+ doubleBatch->notNull[i] = 1;
+ floatBatch->notNull[i] = 1;
+ intBatch->notNull[i] = 1;
+ shortBatch->notNull[i] = 1;
+ byteBatch->notNull[i] = 1;
+ longBatch->notNull[i] = 1;
+
+ doubleBatch->data[i] = data[i];
+ floatBatch->data[i] = static_cast<float>(data[i]);
+ intBatch->data[i] = static_cast<int32_t>(i);
+ shortBatch->data[i] = static_cast<int16_t>(i);
+ byteBatch->data[i] = static_cast<int8_t>(i);
+ longBatch->data[i] = static_cast<int64_t>(i);
+ }
+
+ structBatch->numElements = rowCount;
+ doubleBatch->numElements = rowCount;
+ floatBatch->numElements = rowCount;
+ intBatch->numElements = rowCount;
+ shortBatch->numElements = rowCount;
+ byteBatch->numElements = rowCount;
+ longBatch->numElements = rowCount;
+
+ writer->add(*batch);
+ writer->close();
+
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream(memStream.getData(), memStream.getLength()));
+ std::unique_ptr<Reader> reader = createReader(pool, std::move(inStream));
+ std::unique_ptr<RowReader> rowReader = createRowReader(reader.get(), "GMT", true);
+
+ EXPECT_EQ(rowCount, reader->getNumberOfRows());
+
+ batch = rowReader->createRowBatch(rowCount);
+ EXPECT_EQ(true, rowReader->next(*batch));
+ EXPECT_EQ(rowCount, batch->numElements);
+
+ structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+ doubleBatch = dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]);
+ floatBatch = dynamic_cast<FloatVectorBatch*>(structBatch->fields[1]);
+ intBatch = dynamic_cast<IntVectorBatch*>(structBatch->fields[2]);
+ shortBatch = dynamic_cast<ShortVectorBatch*>(structBatch->fields[3]);
+ byteBatch = dynamic_cast<ByteVectorBatch*>(structBatch->fields[4]);
+ longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[5]);
+ for (uint64_t i = 0; i < rowCount; ++i) {
+ EXPECT_TRUE(std::abs(data[i] - doubleBatch->data[i]) < 0.000001);
+ EXPECT_TRUE(std::abs(static_cast<float>(data[i]) - static_cast<float>(floatBatch->data[i])) <
+ 0.000001f);
+ EXPECT_EQ(intBatch->data[i], static_cast<int32_t>(i));
+ EXPECT_EQ(shortBatch->data[i], static_cast<int16_t>(i));
+ EXPECT_EQ(byteBatch->data[i], static_cast<int8_t>(i));
+ EXPECT_EQ(longBatch->data[i], static_cast<int64_t>(i));
+ }
+ EXPECT_FALSE(rowReader->next(*batch));
+ }
+
INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest,
Values(FileVersion::v_0_11(), FileVersion::v_0_12(),
FileVersion::UNSTABLE_PRE_2_0()));