You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2019/03/14 17:18:27 UTC
[orc] branch master updated: ORC-471: ColumnWriter needs to take
incoming mask for nested types (#376)
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/master by this push:
new 249d046 ORC-471: ColumnWriter needs to take incoming mask for nested types (#376)
249d046 is described below
commit 249d0467d6db9bb1e55234a71ad9ff08232e9983
Author: Xiening Dai <xn...@users.noreply.github.com>
AuthorDate: Thu Mar 14 10:18:23 2019 -0700
ORC-471: ColumnWriter needs to take incoming mask for nested types (#376)
Add incomingMask as a parameter for column writer, so we skip recording nulls
in child type's present stream when parent type value is null.
Fix #376
---
c++/src/ColumnWriter.cc | 148 +++++++++++++--------
c++/src/ColumnWriter.hh | 6 +-
c++/src/Writer.cc | 4 +-
c++/test/TestWriter.cc | 341 +++++++++++++++++++++++++++++++++++++++++++++++-
4 files changed, 437 insertions(+), 62 deletions(-)
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 62bef3b..8c21ed3 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -125,8 +125,9 @@ namespace orc {
void ColumnWriter::add(ColumnVectorBatch& batch,
uint64_t offset,
- uint64_t numValues) {
- notNullEncoder->add(batch.notNull.data() + offset, numValues, nullptr);
+ uint64_t numValues,
+ const char* incomingMask) {
+ notNullEncoder->add(batch.notNull.data() + offset, numValues, incomingMask);
}
void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
@@ -218,7 +219,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -273,24 +275,26 @@ namespace orc {
void StructColumnWriter::add(
ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
const StructVectorBatch* structBatch =
dynamic_cast<const StructVectorBatch *>(&rowBatch);
if (structBatch == nullptr) {
throw InvalidArgument("Failed to cast to StructVectorBatch");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+ const char* notNull = structBatch->hasNulls ?
+ structBatch->notNull.data() + offset : nullptr;
for (uint32_t i = 0; i < children.size(); ++i) {
- children[i]->add(*structBatch->fields[i], offset, numValues);
+ children[i]->add(*structBatch->fields[i], offset, numValues, notNull);
}
// update stats
- if (!structBatch->hasNulls) {
+ if (!notNull) {
colIndexStatistics->increase(numValues);
} else {
uint64_t count = 0;
- const char* notNull = structBatch->notNull.data() + offset;
for (uint64_t i = 0; i < numValues; ++i) {
if (notNull[i]) {
++count;
@@ -402,7 +406,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -443,7 +448,8 @@ namespace orc {
void IntegerColumnWriter::add(
ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
const LongVectorBatch* longBatch =
dynamic_cast<const LongVectorBatch*>(&rowBatch);
if (longBatch == nullptr) {
@@ -455,7 +461,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
const int64_t* data = longBatch->data.data() + offset;
const char* notNull = longBatch->hasNulls ?
@@ -514,7 +520,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -545,7 +552,8 @@ namespace orc {
void ByteColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
if (byteBatch == nullptr) {
throw InvalidArgument("Failed to cast to LongVectorBatch");
@@ -556,7 +564,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to IntegerColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
int64_t* data = byteBatch->data.data() + offset;
const char* notNull = byteBatch->hasNulls ?
@@ -618,7 +626,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -649,7 +658,8 @@ namespace orc {
void BooleanColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
LongVectorBatch* byteBatch = dynamic_cast<LongVectorBatch*>(&rowBatch);
if (byteBatch == nullptr) {
throw InvalidArgument("Failed to cast to LongVectorBatch");
@@ -660,7 +670,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to BooleanColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
int64_t* data = byteBatch->data.data() + offset;
const char* notNull = byteBatch->hasNulls ?
@@ -723,7 +733,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -769,7 +780,8 @@ namespace orc {
void DoubleColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
const DoubleVectorBatch* dblBatch =
dynamic_cast<const DoubleVectorBatch*>(&rowBatch);
if (dblBatch == nullptr) {
@@ -781,7 +793,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to DoubleColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
const double* doubleData = dblBatch->data.data() + offset;
const char* notNull = dblBatch->hasNulls ?
@@ -976,7 +988,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -1064,7 +1077,8 @@ namespace orc {
void StringColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
const StringVectorBatch* stringBatch =
dynamic_cast<const StringVectorBatch*>(&rowBatch);
if (stringBatch == nullptr) {
@@ -1077,7 +1091,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
char *const * data = stringBatch->data.data() + offset;
const int64_t* length = stringBatch->length.data() + offset;
@@ -1418,7 +1432,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
private:
uint64_t maxLength;
@@ -1427,7 +1442,8 @@ namespace orc {
void CharColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
if (charsBatch == nullptr) {
throw InvalidArgument("Failed to cast to StringVectorBatch");
@@ -1439,7 +1455,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
char** data = charsBatch->data.data() + offset;
int64_t* length = charsBatch->length.data() + offset;
@@ -1500,7 +1516,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
private:
uint64_t maxLength;
@@ -1508,7 +1525,8 @@ namespace orc {
void VarCharColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
StringVectorBatch* charsBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
if (charsBatch == nullptr) {
throw InvalidArgument("Failed to cast to StringVectorBatch");
@@ -1520,7 +1538,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to StringColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
char* const* data = charsBatch->data.data() + offset;
int64_t* length = charsBatch->length.data() + offset;
@@ -1567,12 +1585,14 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
};
void BinaryColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
StringVectorBatch* binBatch = dynamic_cast<StringVectorBatch*>(&rowBatch);
if (binBatch == nullptr) {
throw InvalidArgument("Failed to cast to StringVectorBatch");
@@ -1584,7 +1604,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to BinaryColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
char** data = binBatch->data.data() + offset;
int64_t* length = binBatch->length.data() + offset;
@@ -1616,7 +1636,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -1685,7 +1706,8 @@ namespace orc {
void TimestampColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
TimestampVectorBatch* tsBatch =
dynamic_cast<TimestampVectorBatch*>(&rowBatch);
if (tsBatch == nullptr) {
@@ -1698,7 +1720,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to TimestampColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
const char* notNull = tsBatch->hasNulls ?
tsBatch->notNull.data() + offset : nullptr;
@@ -1775,7 +1797,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
};
DateColumnWriter::DateColumnWriter(
@@ -1788,7 +1811,8 @@ namespace orc {
void DateColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
const LongVectorBatch* longBatch =
dynamic_cast<const LongVectorBatch*>(&rowBatch);
if (longBatch == nullptr) {
@@ -1801,7 +1825,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to DateColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
const int64_t* data = longBatch->data.data() + offset;
const char* notNull = longBatch->hasNulls ?
@@ -1833,7 +1857,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -1880,7 +1905,8 @@ namespace orc {
void Decimal64ColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
const Decimal64VectorBatch* decBatch =
dynamic_cast<const Decimal64VectorBatch*>(&rowBatch);
if (decBatch == nullptr) {
@@ -1893,7 +1919,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
const char* notNull = decBatch->hasNulls ?
decBatch->notNull.data() + offset : nullptr;
@@ -1972,7 +1998,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
private:
char buffer[20];
@@ -2001,7 +2028,8 @@ namespace orc {
void Decimal128ColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
const Decimal128VectorBatch* decBatch =
dynamic_cast<const Decimal128VectorBatch*>(&rowBatch);
if (decBatch == nullptr) {
@@ -2014,7 +2042,7 @@ namespace orc {
throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
const char* notNull = decBatch->hasNulls ?
decBatch->notNull.data() + offset : nullptr;
@@ -2059,7 +2087,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -2124,13 +2153,14 @@ namespace orc {
void ListColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
ListVectorBatch* listBatch = dynamic_cast<ListVectorBatch*>(&rowBatch);
if (listBatch == nullptr) {
throw InvalidArgument("Failed to cast to ListVectorBatch");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
int64_t* offsets = listBatch->offsets.data() + offset;
const char* notNull = listBatch->hasNulls ?
@@ -2146,7 +2176,7 @@ namespace orc {
// unnecessary to deal with null as elements are packed together
if (child.get()) {
- child->add(*listBatch->elements, elemOffset, totalNumValues);
+ child->add(*listBatch->elements, elemOffset, totalNumValues, nullptr);
}
lengthEncoder->add(offsets, numValues, notNull);
@@ -2273,7 +2303,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -2342,13 +2373,14 @@ namespace orc {
void MapColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
MapVectorBatch* mapBatch = dynamic_cast<MapVectorBatch*>(&rowBatch);
if (mapBatch == nullptr) {
throw InvalidArgument("Failed to cast to MapVectorBatch");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
int64_t* offsets = mapBatch->offsets.data() + offset;
const char* notNull = mapBatch->hasNulls ?
@@ -2366,10 +2398,10 @@ namespace orc {
// unnecessary to deal with null as keys and values are packed together
if (keyWriter.get()) {
- keyWriter->add(*mapBatch->keys, elemOffset, totalNumValues);
+ keyWriter->add(*mapBatch->keys, elemOffset, totalNumValues, nullptr);
}
if (elemWriter.get()) {
- elemWriter->add(*mapBatch->elements, elemOffset, totalNumValues);
+ elemWriter->add(*mapBatch->elements, elemOffset, totalNumValues, nullptr);
}
if (enableIndex) {
@@ -2529,7 +2561,8 @@ namespace orc {
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) override;
+ uint64_t numValues,
+ const char* incomingMask) override;
virtual void flush(std::vector<proto::Stream>& streams) override;
@@ -2592,13 +2625,14 @@ namespace orc {
void UnionColumnWriter::add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues) {
+ uint64_t numValues,
+ const char* incomingMask) {
UnionVectorBatch* unionBatch = dynamic_cast<UnionVectorBatch*>(&rowBatch);
if (unionBatch == nullptr) {
throw InvalidArgument("Failed to cast to UnionVectorBatch");
}
- ColumnWriter::add(rowBatch, offset, numValues);
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
const char* notNull = unionBatch->hasNulls ?
unionBatch->notNull.data() + offset : nullptr;
@@ -2621,7 +2655,7 @@ namespace orc {
if (childLength[i] > 0) {
children[i]->add(*unionBatch->children[i],
static_cast<uint64_t>(childOffset[i]),
- childLength[i]);
+ childLength[i], nullptr);
}
}
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
index 226cd4d..2364066 100644
--- a/c++/src/ColumnWriter.hh
+++ b/c++/src/ColumnWriter.hh
@@ -93,10 +93,14 @@ namespace orc {
* @param rowBatch the row batch data to write
* @param offset the starting point of row batch to write
* @param numValues the number of values to write
+ * @param incomingMask if null, all values are not null. Otherwise, it is
+ * a mask (with at least numValues bytes) for which
+ * values to write.
*/
virtual void add(ColumnVectorBatch& rowBatch,
uint64_t offset,
- uint64_t numValues);
+ uint64_t numValues,
+ const char * incomingMask);
/**
* Flush column writer output streams.
* @param streams vector to store streams generated by flush()
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 035d6f1..72f7ba7 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -298,7 +298,7 @@ namespace orc {
while (pos < rowsToAdd.numElements) {
chunkSize = std::min(rowsToAdd.numElements - pos,
rowIndexStride - indexRows);
- columnWriter->add(rowsToAdd, pos, chunkSize);
+ columnWriter->add(rowsToAdd, pos, chunkSize, nullptr);
pos += chunkSize;
indexRows += chunkSize;
@@ -311,7 +311,7 @@ namespace orc {
}
} else {
stripeRows += rowsToAdd.numElements;
- columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements);
+ columnWriter->add(rowsToAdd, 0, rowsToAdd.numElements, nullptr);
}
if (columnWriter->getEstimatedSize() >= options.getStripeSize()) {
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index c86c025..12f2ed3 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -47,13 +47,14 @@ namespace orc {
const Type& type,
MemoryPool* memoryPool,
OutputStream* stream,
- FileVersion version){
+ FileVersion version,
+ uint64_t stride = 0){
WriterOptions options;
options.setStripeSize(stripeSize);
options.setCompressionBlockSize(compresionblockSize);
options.setCompression(compression);
options.setMemoryPool(memoryPool);
- options.setRowIndexStride(0);
+ options.setRowIndexStride(stride);
options.setFileVersion(version);
return createWriter(type, stream, options);
}
@@ -1263,6 +1264,342 @@ namespace orc {
EXPECT_FALSE(rowReader->next(*batch));
}
+ TEST_P(WriterTest, testWriteListColumnWithNull) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+ std::unique_ptr<Type> type(Type::buildTypeFromString(
+ "struct<col1:array<tinyint>>"));
+
+ uint64_t stripeSize = 1024;
+ uint64_t compressionBlockSize = 1024;
+
+ std::unique_ptr<Writer> writer = createWriter(stripeSize,
+ compressionBlockSize,
+ CompressionKind_ZLIB,
+ *type,
+ pool,
+ &memStream,
+ fileVersion);
+
+ std::unique_ptr<ColumnVectorBatch> batch =
+ writer->createRowBatch(4);
+ StructVectorBatch * structBatch =
+ dynamic_cast<StructVectorBatch *>(batch.get());
+ ListVectorBatch * listBatch =
+ dynamic_cast<ListVectorBatch *>(structBatch->fields[0]);
+ LongVectorBatch * intBatch =
+ dynamic_cast<LongVectorBatch *>(listBatch->elements.get());
+
+ // test data looks like below -
+ // {[1, 2]}
+ // null
+ // {[3, 4]}
+ // {[5, 6]}
+ int64_t * offsets = listBatch->offsets.data();
+ offsets[0] = 0;
+ offsets[1] = 2;
+ offsets[2] = 2;
+ offsets[3] = 4;
+ offsets[4] = 6;
+
+ intBatch->resize(6);
+ for (uint64_t i = 0; i < 6; ++i) {
+ intBatch->notNull[i] = 1;
+ }
+
+ int64_t * data = intBatch->data.data();
+ for (int8_t i = 1; i < 7; ++i) {
+ data[i - 1] = i;
+ }
+
+ structBatch->numElements = 4;
+
+ listBatch->notNull[1] = 0;
+ listBatch->hasNulls = true;
+ listBatch->numElements = 4;
+
+ writer->add(*batch);
+ writer->close();
+
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream (memStream.getData(), memStream.getLength()));
+ std::unique_ptr<Reader> reader = createReader(
+ pool,
+ std::move(inStream));
+ std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+ EXPECT_EQ(4, reader->getNumberOfRows());
+
+ batch = rowReader->createRowBatch(4 * 2);
+ EXPECT_EQ(true, rowReader->next(*batch));
+
+ structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+ listBatch = dynamic_cast<ListVectorBatch *>(structBatch->fields[0]);
+ intBatch = dynamic_cast<LongVectorBatch *>(listBatch->elements.get());
+
+ data = intBatch->data.data();
+ offsets = listBatch->offsets.data();
+
+ EXPECT_EQ(4, structBatch->numElements);
+ EXPECT_EQ(4, listBatch->numElements);
+ EXPECT_EQ(1, listBatch->notNull[0]);
+ EXPECT_EQ(0, listBatch->notNull[1]);
+ EXPECT_EQ(1, listBatch->notNull[2]);
+ EXPECT_EQ(1, listBatch->notNull[3]);
+
+ EXPECT_EQ(0, offsets[0]);
+ EXPECT_EQ(2, offsets[1]);
+ EXPECT_EQ(2, offsets[2]);
+ EXPECT_EQ(4, offsets[3]);
+ EXPECT_EQ(6, offsets[4]);
+
+ for (int8_t i = 1; i < 7; ++i) {
+ EXPECT_EQ(i, data[i - 1]);
+ }
+ }
+
+ TEST_P(WriterTest, testWriteNestedStructWithNull) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+ std::unique_ptr<Type> type(Type::buildTypeFromString(
+ "struct<struct<bigint>>"));
+
+ uint64_t stripeSize = 1024;
+ uint64_t compressionBlockSize = 1024;
+
+ std::unique_ptr<Writer> writer = createWriter(stripeSize,
+ compressionBlockSize,
+ CompressionKind_ZLIB,
+ *type,
+ pool,
+ &memStream,
+ fileVersion);
+
+ // test data looks like below -
+ // {0}
+ // null
+ // {1}
+ // {2}
+ // {3}
+ std::unique_ptr<ColumnVectorBatch> batch =
+ writer->createRowBatch(5);
+ StructVectorBatch * structBatch =
+ dynamic_cast<StructVectorBatch *>(batch.get());
+ StructVectorBatch * structBatch2 =
+ dynamic_cast<StructVectorBatch *>(structBatch->fields[0]);
+ LongVectorBatch * intBatch =
+ dynamic_cast<LongVectorBatch *>(structBatch2->fields[0]);
+
+ structBatch->numElements = 5;
+ structBatch2->numElements = 5;
+ structBatch2->hasNulls = true;
+ structBatch2->notNull[1] = 0;
+
+ intBatch->resize(5);
+ for (int64_t i = 0; i < 5; ++i) {
+ intBatch->data.data()[i] = i;
+ }
+ intBatch->notNull[1] = 0;
+ intBatch->hasNulls = true;
+
+ writer->add(*batch);
+ writer->close();
+
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream (memStream.getData(), memStream.getLength()));
+ std::unique_ptr<Reader> reader = createReader(
+ pool,
+ std::move(inStream));
+ std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+ EXPECT_EQ(5, reader->getNumberOfRows());
+
+ batch = rowReader->createRowBatch(5);
+ EXPECT_EQ(true, rowReader->next(*batch));
+
+ structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+ structBatch2 = dynamic_cast<StructVectorBatch *>(structBatch->fields[0]);
+ intBatch = dynamic_cast<LongVectorBatch *>(structBatch2->fields[0]);
+
+ for (uint64_t i = 0; i < 5; ++i) {
+ EXPECT_EQ(1, structBatch->notNull[i]);
+ }
+
+ for (uint64_t i = 0; i < 5; ++i) {
+ if (i == 1) {
+ EXPECT_EQ(0, structBatch2->notNull[i]);
+ } else {
+ EXPECT_EQ(1, structBatch2->notNull[i]);
+ }
+ }
+
+ for (uint64_t i = 0; i < 5; ++i) {
+ if (i == 1) {
+ EXPECT_EQ(0, intBatch->notNull[i]);
+ } else {
+ EXPECT_EQ(1, intBatch->notNull[i]);
+ }
+ }
+
+ int64_t * data = intBatch->data.data();
+ for (int8_t i = 0; i < 5; ++i) {
+ if (i != 1) {
+ EXPECT_EQ(i, data[i]);
+ }
+ }
+ }
+
+ TEST_P(WriterTest, testWriteNestedStructWithNullIndex) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool * pool = getDefaultPool();
+ std::unique_ptr<Type> type(Type::buildTypeFromString(
+ "struct<struct<bigint>>"));
+
+ uint64_t stripeSize = 1024;
+ uint64_t compressionBlockSize = 1024;
+
+ // 10000 rows with every 1000 row as an RG
+ // Each RG has 100 null rows except that the 5th RG is all null
+ std::unique_ptr<Writer> writer = createWriter(stripeSize,
+ compressionBlockSize,
+ CompressionKind_ZLIB,
+ *type,
+ pool,
+ &memStream,
+ fileVersion,
+ 1000);
+
+ std::unique_ptr<ColumnVectorBatch> batch =
+ writer->createRowBatch(10000);
+ StructVectorBatch * structBatch =
+ dynamic_cast<StructVectorBatch *>(batch.get());
+ StructVectorBatch * structBatch2 =
+ dynamic_cast<StructVectorBatch *>(structBatch->fields[0]);
+ LongVectorBatch * intBatch =
+ dynamic_cast<LongVectorBatch *>(structBatch2->fields[0]);
+
+ structBatch->numElements = 10000;
+ structBatch2->numElements = 10000;
+ structBatch2->hasNulls = true;
+
+ intBatch->resize(10000);
+ intBatch->hasNulls = true;
+ for (uint64_t i = 0; i < 10; ++i) {
+ for (uint64_t j = i * 1000 + 100 * i; j < i * 1000 + 100 * i + 100; ++j) {
+ structBatch2->notNull[j] = 0;
+ intBatch->notNull[j] = 0;
+ }
+ }
+
+ for (uint64_t i = 5000; i < 6000; ++i) {
+ structBatch2->notNull[i] = 0;
+ intBatch->notNull[i] = 0;
+ }
+
+ for (int64_t i = 0; i < 10000; ++i) {
+ intBatch->data.data()[i] = i;
+ }
+
+ writer->add(*batch);
+ writer->close();
+
+ std::unique_ptr<InputStream> inStream(
+ new MemoryInputStream (memStream.getData(), memStream.getLength()));
+ std::unique_ptr<Reader> reader = createReader(
+ pool,
+ std::move(inStream));
+ std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+ EXPECT_EQ(10000, reader->getNumberOfRows());
+
+ batch = rowReader->createRowBatch(1000);
+ structBatch = dynamic_cast<StructVectorBatch *>(batch.get());
+ structBatch2 = dynamic_cast<StructVectorBatch *>(structBatch->fields[0]);
+ intBatch = dynamic_cast<LongVectorBatch *>(structBatch2->fields[0]);
+
+ // Read rows 0 - 1000
+ EXPECT_EQ(true, rowReader->next(*batch));
+ for (uint64_t i = 0; i < 1000; ++i) {
+ EXPECT_EQ(1, structBatch->notNull[i]);
+ }
+
+ for (uint64_t i = 0; i < 1000; ++i) {
+ if (i < 100) {
+ EXPECT_EQ(0, structBatch2->notNull[i]);
+ } else {
+ EXPECT_EQ(1, structBatch2->notNull[i]);
+ }
+ }
+
+ for (uint64_t i = 0; i < 1000; ++i) {
+ if (i < 100) {
+ EXPECT_EQ(0, intBatch->notNull[i]);
+ } else {
+ EXPECT_EQ(1, intBatch->notNull[i]);
+ EXPECT_EQ(i, intBatch->data.data()[i]);
+ }
+ }
+
+ // Read rows 1800 - 2800, in which 2200 - 2300 are nulls
+ rowReader->seekToRow(1800);
+ EXPECT_EQ(true, rowReader->next(*batch));
+ for (uint64_t i = 0; i < 1000; ++i) {
+ EXPECT_EQ(1, structBatch->notNull[i]);
+ }
+
+ for (uint64_t i = 0; i < 1000; ++i) {
+ if (i >= 400 && i < 500) {
+ EXPECT_EQ(0, structBatch2->notNull[i]);
+ } else {
+ EXPECT_EQ(1, structBatch2->notNull[i]);
+ }
+ }
+
+ for (uint64_t i = 0; i < 1000; ++i) {
+ if (i >= 400 && i < 500) {
+ EXPECT_EQ(0, intBatch->notNull[i]);
+ } else {
+ EXPECT_EQ(1, intBatch->notNull[i]);
+ EXPECT_EQ(i + 1800, intBatch->data.data()[i]);
+ }
+ }
+
+ // Read rows 5000 - 6000, all nulls
+ rowReader->seekToRow(5000);
+ EXPECT_EQ(true, rowReader->next(*batch));
+ for (uint64_t i = 0; i < 1000; ++i) {
+ EXPECT_EQ(1, structBatch->notNull[i]);
+ }
+
+ for (uint64_t i = 0; i < 1000; ++i) {
+ EXPECT_EQ(0, structBatch2->notNull[i]);
+ }
+
+ for (uint64_t i = 0; i < 1000; ++i) {
+ EXPECT_EQ(0, intBatch->notNull[i]);
+ }
+
+ // Read rows 7200 - 8200, in which 7700 - 7800 are null
+ rowReader->seekToRow(7200);
+ EXPECT_EQ(true, rowReader->next(*batch));
+ for (uint64_t i = 0; i < 1000; ++i) {
+ EXPECT_EQ(1, structBatch->notNull[i]);
+ }
+
+ for (uint64_t i = 0; i < 1000; ++i) {
+ if (i >= 500 && i < 600) {
+ EXPECT_EQ(0, structBatch2->notNull[i]);
+ } else {
+ EXPECT_EQ(1, structBatch2->notNull[i]);
+ }
+ }
+
+ for (uint64_t i = 0; i < 1000; ++i) {
+ if (i >= 500 && i < 600) {
+ EXPECT_EQ(0, intBatch->notNull[i]);
+ } else {
+ EXPECT_EQ(1, intBatch->notNull[i]);
+ EXPECT_EQ(i + 7200, intBatch->data.data()[i]);
+ }
+ }
+ }
INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12()));
}