You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2022/04/22 02:05:37 UTC

[orc] branch main updated: ORC-1152: [C++] Support writing short decimals in RLEv2

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new dc142899e ORC-1152: [C++] Support writing short decimals in RLEv2
dc142899e is described below

commit dc142899e95bfda9839136f8c17b5444b2bf2ae3
Author: coderex2522 <re...@gmail.com>
AuthorDate: Fri Apr 22 10:05:31 2022 +0800

    ORC-1152: [C++] Support writing short decimals in RLEv2
    
    This closes #1089
---
 c++/src/ColumnWriter.cc                            | 128 +++++++++++++++++++++
 c++/src/Writer.cc                                  |   8 ++
 c++/test/TestWriter.cc                             |   2 +-
 examples/decimal64_v2_cplusplus.orc                | Bin 0 -> 715 bytes
 .../org/apache/orc/impl/TreeReaderFactory.java     |   3 +-
 .../test/org/apache/orc/impl/TestReaderImpl.java   |  41 +++++++
 6 files changed, 180 insertions(+), 2 deletions(-)

diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index a5f1c4a9c..32b68af34 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -2115,6 +2115,127 @@ namespace orc {
     scaleEncoder->recordPosition(rowIndexPosition.get());
   }
 
+  class Decimal64ColumnWriterV2 : public ColumnWriter {
+  public:
+    Decimal64ColumnWriterV2(const Type& type,
+                            const StreamsFactory& factory,
+                            const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues,
+                     const char* incomingMask) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+
+    virtual void getColumnEncoding(
+        std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void recordPosition() const override;
+
+  protected:
+    uint64_t precision;
+    uint64_t scale;
+    std::unique_ptr<RleEncoder> valueEncoder;
+  };
+
+  Decimal64ColumnWriterV2::Decimal64ColumnWriterV2(
+                               const Type& type,
+                               const StreamsFactory& factory,
+                               const WriterOptions& options) :
+                                   ColumnWriter(type, factory, options),
+                                   precision(type.getPrecision()),
+                                   scale(type.getScale()) {
+    std::unique_ptr<BufferedOutputStream> dataStream =
+        factory.createStream(proto::Stream_Kind_DATA);
+    valueEncoder = createRleEncoder(std::move(dataStream),
+                                    true,
+                                    RleVersion_2,
+                                    memPool,
+                                    options.getAlignedBitpacking());
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  void Decimal64ColumnWriterV2::add(ColumnVectorBatch& rowBatch,
+                                    uint64_t offset,
+                                    uint64_t numValues,
+                                    const char* incomingMask) {
+    const Decimal64VectorBatch* decBatch =
+      dynamic_cast<const Decimal64VectorBatch*>(&rowBatch);
+    if (decBatch == nullptr) {
+      throw InvalidArgument("Failed to cast to Decimal64VectorBatch");
+    }
+
+    DecimalColumnStatisticsImpl* decStats =
+      dynamic_cast<DecimalColumnStatisticsImpl*>(colIndexStatistics.get());
+    if (decStats == nullptr) {
+      throw InvalidArgument("Failed to cast to DecimalColumnStatisticsImpl");
+    }
+
+    ColumnWriter::add(rowBatch, offset, numValues, incomingMask);
+
+    const int64_t* data = decBatch->values.data() + offset;
+    const char* notNull = decBatch->hasNulls ?
+                          decBatch->notNull.data() + offset : nullptr;
+
+    valueEncoder->add(data, numValues, notNull);
+
+    uint64_t count = 0;
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        ++count;
+        if (enableBloomFilter) {
+          std::string decimal = Decimal(
+            data[i], static_cast<int32_t>(scale)).toString(true);
+          bloomFilter->addBytes(
+            decimal.c_str(), static_cast<int64_t>(decimal.size()));
+        }
+        decStats->update(Decimal(data[i], static_cast<int32_t>(scale)));
+      }
+    }
+    decStats->increase(count);
+    if (count < numValues) {
+      decStats->setHasNull(true);
+    }
+  }
+
+  void Decimal64ColumnWriterV2::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+
+    proto::Stream dataStream;
+    dataStream.set_kind(proto::Stream_Kind_DATA);
+    dataStream.set_column(static_cast<uint32_t>(columnId));
+    dataStream.set_length(valueEncoder->flush());
+    streams.push_back(dataStream);
+  }
+
+  uint64_t Decimal64ColumnWriterV2::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    size += valueEncoder->getBufferSize();
+    return size;
+  }
+
+  void Decimal64ColumnWriterV2::getColumnEncoding(
+    std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(RleVersionMapper(RleVersion_2));
+    encoding.set_dictionarysize(0);
+    if (enableBloomFilter) {
+      encoding.set_bloomencoding(BloomFilterVersion::UTF8);
+    }
+    encodings.push_back(encoding);
+  }
+
+  void Decimal64ColumnWriterV2::recordPosition() const {
+    ColumnWriter::recordPosition();
+    valueEncoder->recordPosition(rowIndexPosition.get());
+  }
+
   class Decimal128ColumnWriter : public Decimal64ColumnWriter {
   public:
     Decimal128ColumnWriter(const Type& type,
@@ -3019,6 +3140,13 @@ namespace orc {
                                     true));
       case DECIMAL:
         if (type.getPrecision() <= Decimal64ColumnWriter::MAX_PRECISION_64) {
+          if (options.getFileVersion() == FileVersion::UNSTABLE_PRE_2_0()) {
+            return std::unique_ptr<ColumnWriter>(
+              new Decimal64ColumnWriterV2(
+                                          type,
+                                          factory,
+                                          options));
+          }
           return std::unique_ptr<ColumnWriter>(
             new Decimal64ColumnWriter(
                                       type,
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index b5ae6b74d..05adb4744 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -143,6 +143,14 @@ namespace orc {
       privateBits->fileVersion = version;
       return *this;
     }
+    if (version == FileVersion::UNSTABLE_PRE_2_0()) {
+      *privateBits->errorStream << "Warning: ORC files written in "
+                                << FileVersion::UNSTABLE_PRE_2_0().toString()
+                                << " will not be readable by other versions of the software."
+                                << " It is only for developer testing.\n";
+      privateBits->fileVersion = version;
+      return *this;
+    }
     throw std::logic_error("Unsupported file version specified.");
   }
 
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 8df2c2277..b0158d9ea 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -1996,5 +1996,5 @@ namespace orc {
     }
   }
 
-  INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12()));
+  INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), FileVersion::v_0_12(), FileVersion::UNSTABLE_PRE_2_0()));
 }
diff --git a/examples/decimal64_v2_cplusplus.orc b/examples/decimal64_v2_cplusplus.orc
new file mode 100644
index 000000000..faf35247f
Binary files /dev/null and b/examples/decimal64_v2_cplusplus.orc differ
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index 47ee9111d..f2ff95ac2 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -1783,7 +1783,8 @@ public class TreeReaderFactory {
 
     @Override
     public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
-      if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT)) {
+      if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
+          encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
         throw new IOException("Unknown encoding " + encoding + " in column " +
             columnId);
       }
diff --git a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
index 9d23d7244..d038a8623 100644
--- a/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
+++ b/java/core/src/test/org/apache/orc/impl/TestReaderImpl.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
@@ -446,6 +448,45 @@ public class TestReaderImpl {
     CheckFileWithSargs("bad_bloom_filter_1.6.0.orc", "ORC C++ ");
   }
 
+  @Test
+  public void testReadDecimalV2File() throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path(workDir, "decimal64_v2_cplusplus.orc");
+    FileSystem fs = path.getFileSystem(conf);
+    try (ReaderImpl reader = (ReaderImpl) OrcFile.createReader(path,
+        OrcFile.readerOptions(conf).filesystem(fs))) {
+      assertEquals("ORC C++ 1.8.0-SNAPSHOT", reader.getSoftwareVersion());
+      OrcTail tail = reader.extractFileTail(fs, path, Long.MAX_VALUE);
+      List<StripeStatistics> stats = tail.getStripeStatistics();
+      assertEquals(1, stats.size());
+
+      try (RecordReader rows = reader.rows()) {
+        TypeDescription schema = reader.getSchema();
+        assertEquals("struct<a:bigint,b:decimal(10,2),c:decimal(2,2),d:decimal(2,2),e:decimal(2,2)>",
+            schema.toString());
+        VectorizedRowBatch batch = schema.createRowBatchV2();
+        assertTrue(rows.nextBatch(batch), "No rows read out!");
+        assertEquals(10, batch.size);
+        LongColumnVector col1 = (LongColumnVector) batch.cols[0];
+        Decimal64ColumnVector col2 = (Decimal64ColumnVector) batch.cols[1];
+        Decimal64ColumnVector col3 = (Decimal64ColumnVector) batch.cols[2];
+        Decimal64ColumnVector col4 = (Decimal64ColumnVector) batch.cols[3];
+        Decimal64ColumnVector col5 = (Decimal64ColumnVector) batch.cols[4];
+        for (int i = 0; i < batch.size; ++i) {
+          assertEquals(17292380420L + i, col1.vector[i]);
+          if (i == 0) {
+            long scaleNum = (long) Math.pow(10, col2.scale);
+            assertEquals(164.16 * scaleNum, col2.vector[i]);
+          } else {
+            assertEquals(col2.vector[i - 1] * 2, col2.vector[i]);
+          }
+          assertEquals(col3.vector[i] + col4.vector[i], col5.vector[i]);
+        }
+        assertFalse(rows.nextBatch(batch));
+      }
+    }
+  }
+
   @Test
   public void testExtractFileTailIndexOutOfBoundsException() throws Exception {
     Configuration conf = new Configuration();