You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/04/22 02:05:37 UTC
[orc] branch main updated: ORC-1152: [C++] Support writing short decimals in RLEv2
This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new dc142899e ORC-1152: [C++] Support writing short decimals in RLEv2
dc142899e is described below
commit dc142899e95bfda9839136f8c17b5444b2bf2ae3
Author: coderex2522 <re...@gmail.com>
AuthorDate: Fri Apr 22 10:05:31 2022 +0800
ORC-1152: [C++] Support writing short decimals in RLEv2
This closes #1089
---
c++/src/ColumnWriter.cc | 128 +++++++++++++++++++++
c++/src/Writer.cc | 8 ++
c++/test/TestWriter.cc | 2 +-
examples/decimal64_v2_cplusplus.orc | Bin 0 -> 715 bytes
.../org/apache/orc/impl/TreeReaderFactory.java | 3 +-
.../test/org/apache/orc/impl/TestReaderImpl.java | 41 +++++++
6 files changed, 180 insertions(+), 2 deletions(-)
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index a5f1c4a9c..32b68af34 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -2115,6 +2115,127 @@ namespace orc {
scaleEncoder->recordPosition(rowIndexPosition.get());
}
+ class Decimal64ColumnWriterV2 : public ColumnWriter {
+ public:
+ Decimal64ColumnWriterV2(const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options);
+
+ virtual void add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) override;
+
+ virtual void flush(std::vector<proto::Stream>& streams) override;
+
+ virtual uint64_t getEstimatedSize() const override;
+
+ virtual void getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const override;
+
+ virtual void recordPosition() const override;
+
+ protected:
+ uint64_t precision;
+ uint64_t scale;
+ std::unique_ptr<RleEncoder> valueEncoder;
+ };
+
+ Decimal64ColumnWriterV2::Decimal64ColumnWriterV2(
+ const Type& type,
+ const StreamsFactory& factory,
+ const WriterOptions& options) :
+ ColumnWriter(type, factory, options),
+ precision(type.getPrecision()),
+ scale(type.getScale()) {
+ std::unique_ptr<BufferedOutputStream> dataStream =
+ factory.createStream(proto::Stream_Kind_DATA);
+ valueEncoder = createRleEncoder(std::move(dataStream),
+ true,
+ RleVersion_2,
+ memPool,
+ options.getAlignedBitpacking());
+
+ if (enableIndex) {
+ recordPosition();
+ }
+ }
+
+ void Decimal64ColumnWriterV2::add(ColumnVectorBatch& rowBatch,
+ uint64_t offset,
+ uint64_t numValues,
+ const char* incomingMask) {
+ const Decimal64VectorBatch* decBatch =
+ dynamic_cast<const Decimal64VectorBatch*>(&rowBatch);
+ if (decBatch == nullptr) {
+ throw InvalidArgument("Failed to cast to Decimal64VectorBatch");
+ }
+
+ DecimalColumnStatisticsImpl* decStats =
+ dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
+ if (decStats == nullptr) {
+ throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
+ }
+
+ ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+ const int64_t* data = decBatch->values.data() + offset;
+ const char* notNull = decBatch->hasNulls ?
+ decBatch->notNull.data() + offset : nullptr;
+
+ valueEncoder->add(data, numValues, notNull);
+
+ uint64_t count = 0;
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ ++count;
+ if (enableBloomFilter) {
+ std::string decimal = Decimal(
+ data[i], static_cast<int32_t>(scale)).toString(true);
+ bloomFilter->addBytes(
+ decimal.c_str(), static_cast<int64_t>(decimal.size()));
+ }
+ decStats->update(Decimal(data[i], static_cast<int32_t>(scale)));
+ }
+ }
+ decStats->increase(count);
+ if (count < numValues) {
+ decStats->setHasNull(true);
+ }
+ }
+
+ void Decimal64ColumnWriterV2::flush(std::vector<proto::Stream>& streams) {
+ ColumnWriter::flush(streams);
+
+ proto::Stream dataStream;
+ dataStream.set_kind(proto::Stream_Kind_DATA);
+ dataStream.set_column(static_cast<uint32_t>(columnId));
+ dataStream.set_length(valueEncoder->flush());
+ streams.push_back(dataStream);
+ }
+
+ uint64_t Decimal64ColumnWriterV2::getEstimatedSize() const {
+ uint64_t size = ColumnWriter::getEstimatedSize();
+ size += valueEncoder->getBufferSize();
+ return size;
+ }
+
+ void Decimal64ColumnWriterV2::getColumnEncoding(
+ std::vector<proto::ColumnEncoding>& encodings) const {
+ proto::ColumnEncoding encoding;
+ encoding.set_kind(RleVersionMapper(RleVersion_2));
+ encoding.set_dictionarysize(0);
+ if (enableBloomFilter) {
+ encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+ }
+ encodings.push_back(encoding);
+ }
+
+ void Decimal64ColumnWriterV2::recordPosition() const {
+ ColumnWriter::recordPosition();
+ valueEncoder->recordPosition(rowIndexPosition.get());
+ }
+
class Decimal128ColumnWriter : public Decimal64ColumnWriter {
public:
Decimal128ColumnWriter(const Type& type,
@@ -3019,6 +3140,13 @@ namespace orc {
true));
case DECIMAL:
if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
+ if (options.getFileVersion() == FileVersion::UNSTABLE_PRE_2_0()) {
+ return std::unique_ptr<ColumnWriter>(
+ new Decimal64ColumnWriterV2(
+ type,
+ factory,
+ options));
+ }
return std::unique_ptr<ColumnWriter>(
new Decimal64ColumnWriter(
type,
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index b5ae6b74d..05adb4744 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -143,6 +143,14 @@ namespace orc {
privateBits->fileVersion = version;
return *this;
}
+ if (version == FileVersion::UNSTABLE_PRE_2_0()) {
+ *privateBits->errorStream << "Warning: ORC files written in "
+ << FileVersion::UNSTABLE_PRE_2_0().toString()
+ << " will not be readable by other versions of the software."
+ << " It is only for developer testing.\n";
+ privateBits->fileVersion = version;
+ return *this;
+ }
throw std::logic_error("Unsupported file version specified.");
}
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 8df2c2277..b0158d9ea 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -1996,5 +1996,5 @@ namespace orc {
}
}
- INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12()));
+ INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12(), FileVersion::UNSTABLE_PRE_2_0()));
}
diff --git a/examples/decimal64_v2_cplusplus.orc b/examples/decimal64_v2_cplusplus.orc
new file mode 100644
index 000000000..faf35247f
Binary files /dev/null and b/examples/decimal64_v2_cplusplus.orc differ
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 47ee9111d..f2ff95ac2 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -1783,7 +1783,8 @@ public class TreeReaderFactory {
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT)) {
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
+ encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
diff --git a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
index 9d23d7244..d038a8623 100644
--- a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
@@ -446,6 +448,45 @@ public class TestReaderImpl {
CheckFileWithSargs("bad_bloom_filter_1.6.0.orc", "ORC C++ ");
}
+ @Test
+ public void testReadDecimalV2File() throws IOException {
+ Configuration conf = new Configuration();
+ Path path = new Path(workDir, "decimal64_v2_cplusplus.orc");
+ FileSystem fs = path.getFileSystem(conf);
+ try (ReaderImpl reader = (ReaderImpl) OrcFile.createReader(path,
+ OrcFile.readerOptions(conf).filesystem(fs))) {
+ assertEquals("ORC C++ 1.8.0-SNAPSHOT", reader.getSoftwareVersion());
+ OrcTail tail = reader.extractFileTail(fs, path, Long.MAX_VALUE);
+ List<StripeStatistics> stats = tail.getStripeStatistics();
+ assertEquals(1, stats.size());
+
+ try (RecordReader rows = reader.rows()) {
+ TypeDescription schema = reader.getSchema();
+ assertEquals("struct<a:bigint,b:decimal(10,2),c:decimal(2,2),d:decimal(2,2),e:decimal(2,2)>",
+ schema.toString());
+ VectorizedRowBatch batch = schema.createRowBatchV2();
+ assertTrue(rows.nextBatch(batch), "No rows read out!");
+ assertEquals(10, batch.size);
+ LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+ Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+ Decimal64ColumnVector col3 = (Decimal64ColumnVector) batch.cols[2];
+ Decimal64ColumnVector col4 = (Decimal64ColumnVector) batch.cols[3];
+ Decimal64ColumnVector col5 = (Decimal64ColumnVector) batch.cols[4];
+ for (int i = 0; i < batch.size; ++i) {
+ assertEquals(17292380420L + i, col1.vector[i]);
+ if (i == 0) {
+ long scaleNum = (long) Math.pow(10, col2.scale);
+ assertEquals(164.16 * scaleNum, col2.vector[i]);
+ } else {
+ assertEquals(col2.vector[i - 1] * 2, col2.vector[i]);
+ }
+ assertEquals(col3.vector[i] + col4.vector[i], col5.vector[i]);
+ }
+ assertFalse(rows.nextBatch(batch));
+ }
+ }
+ }
+
@Test
public void testExtractFileTailIndexOutOfBoundsException() throws Exception {
Configuration conf = new Configuration();