You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2022/04/28 20:10:25 UTC
[orc] branch branch-1.7 updated: ORC-1160: [C++] Fix seekToRow can't seek within selected row group (#1102)
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 2d4cafaa2 ORC-1160: [C++] Fix seekToRow can't seek within selected row group (#1102)
2d4cafaa2 is described below
commit 2d4cafaa29295eb168428e6773f0934ea229db49
Author: Quanlong Huang <hu...@gmail.com>
AuthorDate: Fri Apr 29 04:10:20 2022 +0800
ORC-1160: [C++] Fix seekToRow can't seek within selected row group (#1102)
### What changes were proposed in this pull request?
This cherrypicks #1100 and resolves the conflicts.
### Why are the changes needed?
This fixes ORC-1160 in the 1.7 branch.
### How was this patch tested?
New tests are added in #1100.
---
c++/src/Reader.cc | 28 +++----
c++/test/TestPredicatePushdown.cc | 155 +++++++++++++++++++++++++++++++++++++-
2 files changed, 166 insertions(+), 17 deletions(-)
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 6644d1e1c..82001b9f7 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -350,26 +350,28 @@ namespace orc {
previousRow = rowNumber;
startNextStripe();
- // when predicate push down is enabled, above call to startNextStripe()
- // will move current row to 1st matching row group; here we only need
- // to deal with the case when PPD is not enabled.
- if (!sargsApplier) {
- uint64_t rowsToSkip = currentRowInStripe;
-
- if (footer->rowindexstride() > 0 &&
- currentStripeInfo.indexlength() > 0) {
+ uint64_t rowsToSkip = currentRowInStripe;
+ auto rowIndexStride = footer->rowindexstride();
+ // seek to the target row group if row indexes exists
+ if (rowIndexStride > 0 && currentStripeInfo.indexlength() > 0) {
+ // when predicate push down is enabled, above call to startNextStripe()
+ // will move current row to 1st matching row group; here we only need
+ // to deal with the case when PPD is not enabled.
+ if (!sargsApplier) {
if (rowIndexes.empty()) {
loadStripeIndex();
}
- uint32_t rowGroupId =
- static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
- rowsToSkip -= static_cast<uint64_t>(rowGroupId) * footer->rowindexstride();
-
+ auto rowGroupId = static_cast<uint32_t>(rowsToSkip / rowIndexStride);
if (rowGroupId != 0) {
seekToRowGroup(rowGroupId);
}
}
-
+ // skip leading rows in the target row group
+ rowsToSkip %= rowIndexStride;
+ }
+ // 'reader' is reset in startNextStripe(). It could be nullptr if 'rowsToSkip' is 0,
+ // e.g. when startNextStripe() skips all remaining rows of the file.
+ if (rowsToSkip > 0) {
reader->skip(rowsToSkip);
}
}
diff --git a/c++/test/TestPredicatePushdown.cc b/c++/test/TestPredicatePushdown.cc
index 41d0b532f..31ca0dffe 100644
--- a/c++/test/TestPredicatePushdown.cc
+++ b/c++/test/TestPredicatePushdown.cc
@@ -26,7 +26,7 @@ namespace orc {
static const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
- void createMemTestFile(MemoryOutputStream& memStream) {
+ void createMemTestFile(MemoryOutputStream& memStream, uint64_t rowIndexStride) {
MemoryPool * pool = getDefaultPool();
auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
"struct<int1:bigint,string1:string>"));
@@ -35,7 +35,7 @@ namespace orc {
.setCompressionBlockSize(1024)
.setCompression(CompressionKind_NONE)
.setMemoryPool(pool)
- .setRowIndexStride(1000);
+ .setRowIndexStride(rowIndexStride);
auto writer = createWriter(*type, &memStream, options);
auto batch = writer->createRowBatch(3500);
@@ -223,10 +223,96 @@ namespace orc {
}
}
+ void TestSeekWithPredicates(Reader* reader, uint64_t seekRowNumber) {
+ // Build search argument (x < 300000) for column 'int1'. Only the first row group
+ // will be selected. It has 1000 rows: (0, "0"), (300, "10"), (600, "20"), ...,
+ // (299700, "9990").
+ std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+ ->lessThan("int1", PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(300000)))
+ .build();
+ RowReaderOptions rowReaderOpts;
+ rowReaderOpts.searchArgument(std::move(sarg));
+ auto rowReader = reader->createRowReader(rowReaderOpts);
+ auto readBatch = rowReader->createRowBatch(2000);
+ auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+ auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+ auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+ rowReader->seekToRow(seekRowNumber);
+ if (seekRowNumber >= 1000) {
+ // Seek advance the first row group will go to the end of file
+ EXPECT_FALSE(rowReader->next(*readBatch));
+ EXPECT_EQ(0, readBatch->numElements);
+ EXPECT_EQ(3500, rowReader->getRowNumber());
+ return;
+ }
+ EXPECT_TRUE(rowReader->next(*readBatch));
+ EXPECT_EQ(1000 - seekRowNumber, readBatch->numElements);
+ EXPECT_EQ(seekRowNumber, rowReader->getRowNumber());
+ for (uint64_t i = 0; i < readBatch->numElements; ++i) {
+ EXPECT_EQ(300 * (i + seekRowNumber), batch1.data[i]);
+ EXPECT_EQ(std::to_string(10 * (i + seekRowNumber)),
+ std::string(batch2.data[i], static_cast<size_t>(batch2.length[i])));
+ }
+ EXPECT_FALSE(rowReader->next(*readBatch));
+ EXPECT_EQ(3500, rowReader->getRowNumber());
+ }
+
+ void TestMultipleSeeksWithPredicates(Reader* reader) {
+ // Build search argument (x >= 300000 AND x < 600000) for column 'int1'. Only the 2nd
+ // row group will be selected.
+ std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+ ->startAnd()
+ .startNot()
+ .lessThan("int1", PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(300000L)))
+ .end()
+ .lessThan("int1", PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(600000L)))
+ .end()
+ .build();
+ RowReaderOptions rowReaderOpts;
+ rowReaderOpts.searchArgument(std::move(sarg));
+ auto rowReader = reader->createRowReader(rowReaderOpts);
+
+ // Read only one row after each seek
+ auto readBatch = rowReader->createRowBatch(1);
+ auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+ auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+ auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+ // Seek within the 1st row group will go to the start of the 2nd row group
+ rowReader->seekToRow(10);
+ EXPECT_TRUE(rowReader->next(*readBatch));
+ EXPECT_EQ(1000, rowReader->getRowNumber()) << "Should start at the 2nd row group";
+ EXPECT_EQ(1, readBatch->numElements);
+ EXPECT_EQ(300000, batch1.data[0]);
+ EXPECT_EQ("10000", std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
+
+ // Seek within the 2nd row group (1000 rows) which is selected by the search argument
+ uint64_t seekRowNum[] = {1001, 1010, 1100, 1500, 1999};
+ for (uint64_t pos : seekRowNum) {
+ rowReader->seekToRow(pos);
+ EXPECT_TRUE(rowReader->next(*readBatch));
+ EXPECT_EQ(pos, rowReader->getRowNumber());
+ EXPECT_EQ(1, readBatch->numElements);
+ EXPECT_EQ(300 * pos, batch1.data[0]);
+ EXPECT_EQ(std::to_string(10 * pos),
+ std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
+ }
+
+ // Seek advance the 2nd row group will go to the end of file
+ rowReader->seekToRow(2000);
+ EXPECT_FALSE(rowReader->next(*readBatch));
+ EXPECT_EQ(3500, rowReader->getRowNumber());
+ EXPECT_EQ(0, readBatch->numElements);
+ }
+
TEST(TestPredicatePushdown, testPredicatePushdown) {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
- MemoryPool * pool = getDefaultPool();
- createMemTestFile(memStream);
+ MemoryPool* pool = getDefaultPool();
+ createMemTestFile(memStream, 1000);
std::unique_ptr<InputStream> inStream(new MemoryInputStream (
memStream.getData(), memStream.getLength()));
ReaderOptions readerOptions;
@@ -237,5 +323,66 @@ namespace orc {
TestRangePredicates(reader.get());
TestNoRowsSelected(reader.get());
TestOrPredicates(reader.get());
+
+ uint64_t seekRowNumbers[] = {0, 10, 100, 500, 999, 1000, 1001, 4000};
+ for (uint64_t seekRowNumber : seekRowNumbers) {
+ TestSeekWithPredicates(reader.get(), seekRowNumber);
+ }
+
+ TestMultipleSeeksWithPredicates(reader.get());
+ }
+
+ void TestMultipleSeeksWithoutRowIndexes(Reader* reader, bool createSarg) {
+ RowReaderOptions rowReaderOpts;
+ if (createSarg) {
+ // Build search argument x < 300000 for column 'int1'. All rows will be selected
+ // since there are no row indexes in the file.
+ std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+ ->lessThan("int1", PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(300000L)))
+ .build();
+ rowReaderOpts.searchArgument(std::move(sarg));
+ }
+ auto rowReader = reader->createRowReader(rowReaderOpts);
+
+ // Read only one row after each seek
+ auto readBatch = rowReader->createRowBatch(1);
+ auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+ auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+ auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+ // Seeks within the file
+ uint64_t seekRowNum[] = {0, 1, 100, 999, 1001, 1010, 1100, 1500, 1999, 3000, 3499};
+ for (uint64_t pos : seekRowNum) {
+ rowReader->seekToRow(pos);
+ EXPECT_TRUE(rowReader->next(*readBatch));
+ EXPECT_EQ(pos, rowReader->getRowNumber());
+ EXPECT_EQ(1, readBatch->numElements);
+ EXPECT_EQ(300 * pos, batch1.data[0]);
+ EXPECT_EQ(std::to_string(10 * pos),
+ std::string(batch2.data[0], static_cast<size_t>(batch2.length[0])));
+ }
+
+ // Seek advance the end of file
+ rowReader->seekToRow(4000);
+ EXPECT_FALSE(rowReader->next(*readBatch));
+ EXPECT_EQ(3500, rowReader->getRowNumber());
+ EXPECT_EQ(0, readBatch->numElements);
+ }
+
+ TEST(TestPredicatePushdown, testPredicatePushdownWithoutRowIndexes) {
+ MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ // Create the file with rowIndexStride=0, so there are no row groups or row indexes.
+ createMemTestFile(memStream, 0);
+ std::unique_ptr<InputStream> inStream(new MemoryInputStream (
+ memStream.getData(), memStream.getLength()));
+ ReaderOptions readerOptions;
+ readerOptions.setMemoryPool(*pool);
+ std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions);
+ EXPECT_EQ(3500, reader->getNumberOfRows());
+
+ TestMultipleSeeksWithoutRowIndexes(reader.get(), true);
+ TestMultipleSeeksWithoutRowIndexes(reader.get(), false);
}
} // namespace orc