You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2023/04/24 09:24:52 UTC

[orc] branch main updated: ORC-1401: [C++] allow writing an intermediate footer

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 071b75755 ORC-1401: [C++] allow writing an intermediate footer
071b75755 is described below

commit 071b75755055de71f9b0b66880fbcdc4e1f347d4
Author: hinxx <hi...@gmail.com>
AuthorDate: Mon Apr 24 11:24:45 2023 +0200

    ORC-1401: [C++] allow writing an intermediate footer
    
    Current C++ code base does not support writing intermediate footers as Java code base does.
    
    It would be beneficial to the C++ based writers to have an ability to write intermediate footer(s) to the opened file while streaming data into it. The benefit is not losing the so far persisted data (after flushing stripes) in case of writer failure in closing the file (application crash).
    
    The writer that wants to support the feature would follow similar approach as Hive does in writing a side file with the latest footer offsets. The reader would be able to determine the location of the last valid footer by using the last footer location from the side file (instead of assuming that the footer is at the end of the file).
    
    The change was tested by creating a small ORC file writer test application that calls newly added WriterImpl::writeIntermediateFooter() after adding a batch of rows: The resulting ORC file is readable by the provided stock C++ and Java tools.
    
    This closes #1455
---
 c++/include/orc/OrcFile.hh     |   7 +++
 c++/include/orc/Writer.hh      |   7 +++
 c++/src/OrcFile.cc             |   7 +++
 c++/src/Writer.cc              |  25 +++++++++
 c++/test/MemoryOutputStream.hh |   2 +
 c++/test/TestWriter.cc         | 119 +++++++++++++++++++++++++++++++++++++++++
 6 files changed, 167 insertions(+)

diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index 455a3f722..6e4a07bf7 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -98,6 +98,13 @@ namespace orc {
      * Close the stream and flush any pending data to the disk.
      */
     virtual void close() = 0;
+
+    /**
+     * Flush any pending data to the disk.
+     */
+    virtual void flush() {
+      throw NotImplementedYet("Not supported");
+    }
   };
 
   /**
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 586016b00..d1f7b4d18 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -303,6 +303,13 @@ namespace orc {
      * Add user metadata to the writer.
      */
     virtual void addUserMetadata(const std::string& name, const std::string& value) = 0;
+
+    /**
+     * Write an intermediate footer on the file such that if the file is
+     * truncated to the returned offset, it would be a valid ORC file.
+     * @return the offset that would be a valid end location for an ORC file
+     */
+    virtual uint64_t writeIntermediateFooter() = 0;
   };
 }  // namespace orc
 
diff --git a/c++/src/OrcFile.cc b/c++/src/OrcFile.cc
index fe9561458..d4b6a86e2 100644
--- a/c++/src/OrcFile.cc
+++ b/c++/src/OrcFile.cc
@@ -33,6 +33,7 @@
 #define S_IWUSR _S_IWRITE
 #define stat _stat64
 #define fstat _fstat64
+#define fsync _commit
 #else
 #include <unistd.h>
 #define O_BINARY 0
@@ -175,6 +176,12 @@ namespace orc {
         closed = true;
       }
     }
+
+    void flush() override {
+      if (!closed) {
+        ::fsync(file);
+      }
+    }
   };
 
   FileOutputStream::~FileOutputStream() {
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 14ee68f71..84c8a502e 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -307,6 +307,8 @@ namespace orc {
     static const char* magicId;
     static const WriterId writerId;
     bool useTightNumericVector;
+    int32_t stripesAtLastFlush;
+    uint64_t lastFlushOffset;
 
    public:
     WriterImpl(const Type& type, OutputStream* stream, const WriterOptions& options);
@@ -319,6 +321,8 @@ namespace orc {
 
     void addUserMetadata(const std::string& name, const std::string& value) override;
 
+    uint64_t writeIntermediateFooter() override;
+
    private:
     void init();
     void initStripe();
@@ -340,6 +344,8 @@ namespace orc {
     columnWriter = buildWriter(type, *streamsFactory, options);
     stripeRows = totalRows = indexRows = 0;
     currentOffset = 0;
+    stripesAtLastFlush = 0;
+    lastFlushOffset = 0;
 
     useTightNumericVector = opts.getUseTightNumericVector();
 
@@ -400,6 +406,24 @@ namespace orc {
     outStream->close();
   }
 
+  uint64_t WriterImpl::writeIntermediateFooter() {
+    if (stripeRows > 0) {
+      writeStripe();
+    }
+    if (stripesAtLastFlush != fileFooter.stripes_size()) {
+      writeMetadata();
+      writeFileFooter();
+      writePostscript();
+      stripesAtLastFlush = fileFooter.stripes_size();
+      outStream->flush();
+      lastFlushOffset = outStream->getLength();
+      currentOffset = lastFlushOffset;
+      // init stripe now that we adjusted the currentOffset
+      initStripe();
+    }
+    return lastFlushOffset;
+  }
+
   void WriterImpl::addUserMetadata(const std::string& name, const std::string& value) {
     proto::UserMetadataItem* userMetadataItem = fileFooter.add_metadata();
     userMetadataItem->set_name(name);
@@ -542,6 +566,7 @@ namespace orc {
     // update file statistics
     std::vector<proto::ColumnStatistics> colStats;
     columnWriter->getFileStatistics(colStats);
+    fileFooter.clear_statistics();
     for (uint32_t i = 0; i != colStats.size(); ++i) {
       *fileFooter.add_statistics() = colStats[i];
     }
diff --git a/c++/test/MemoryOutputStream.hh b/c++/test/MemoryOutputStream.hh
index 31b319a8d..e6b70f593 100644
--- a/c++/test/MemoryOutputStream.hh
+++ b/c++/test/MemoryOutputStream.hh
@@ -56,6 +56,8 @@ namespace orc {
 
     void close() override {}
 
+    void flush() override {}
+
     void reset() {
       length = 0;
     }
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index c8c3ca139..99feee766 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -66,6 +66,15 @@ namespace orc {
     return createReader(std::move(stream), options);
   }
 
+  std::unique_ptr<Reader> createReaderWithTailLocation(MemoryPool* memoryPool,
+                                                       std::unique_ptr<InputStream> stream,
+                                                       uint64_t tailLocation) {
+    ReaderOptions options;
+    options.setMemoryPool(*memoryPool);
+    options.setTailLocation(tailLocation);
+    return createReader(std::move(stream), options);
+  }
+
   std::unique_ptr<RowReader> createRowReader(Reader* reader, const std::string& timezone = "GMT",
                                              bool useTightNumericVector = false) {
     RowReaderOptions rowReaderOpts;
@@ -1994,6 +2003,116 @@ namespace orc {
     EXPECT_FALSE(rowReader->next(*batch));
   }
 
+  TEST_P(WriterTest, writeIntFileFlush) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:int>"));
+
+    uint64_t stripeSize = 1024;            // 1K
+    uint64_t compressionBlockSize = 1024;  // 1k
+
+    std::unique_ptr<Writer> writer =
+        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool,
+                     &memStream, fileVersion);
+    std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
+    StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+    LongVectorBatch* longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
+
+    std::vector<uint64_t> offsets;
+    for (uint64_t j = 0; j < 10; ++j) {
+      for (uint64_t i = 0; i < 65535; ++i) {
+        longBatch->data[i] = static_cast<int64_t>(i);
+      }
+      structBatch->numElements = 65535;
+      longBatch->numElements = 65535;
+
+      writer->add(*batch);
+      // force writing a stripe
+      uint64_t offset = writer->writeIntermediateFooter();
+      offsets.push_back(offset);
+    }
+
+    writer->close();
+
+    for (uint64_t o = 0; o < 10; ++o) {
+      auto inStream =
+          std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
+      std::unique_ptr<Reader> reader =
+          createReaderWithTailLocation(pool, std::move(inStream), offsets[o]);
+      std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+      EXPECT_EQ(o + 1, reader->getNumberOfStripes());
+      EXPECT_EQ(65535 * (o + 1), reader->getNumberOfRows());
+
+      batch = rowReader->createRowBatch(65535);
+      for (uint64_t j = 0; j < o + 1; ++j) {
+        EXPECT_TRUE(rowReader->next(*batch));
+        EXPECT_EQ(65535, batch->numElements);
+
+        for (uint64_t i = 0; i < 65535; ++i) {
+          structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+          longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
+          EXPECT_EQ(i, longBatch->data[i]);
+        }
+      }
+      EXPECT_FALSE(rowReader->next(*batch));
+    }
+  }
+
+  TEST_P(WriterTest, writeIntFileFlushNoClose) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    std::unique_ptr<Type> type(Type::buildTypeFromString("struct<col1:int>"));
+
+    uint64_t stripeSize = 1024;            // 1K
+    uint64_t compressionBlockSize = 1024;  // 1k
+
+    std::unique_ptr<Writer> writer =
+        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, *type, pool,
+                     &memStream, fileVersion);
+    std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
+    StructVectorBatch* structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+    LongVectorBatch* longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
+
+    std::vector<uint64_t> offsets;
+    for (uint64_t j = 0; j < 10; ++j) {
+      for (uint64_t i = 0; i < 65535; ++i) {
+        longBatch->data[i] = static_cast<int64_t>(i);
+      }
+      structBatch->numElements = 65535;
+      longBatch->numElements = 65535;
+
+      writer->add(*batch);
+      if (j < 8) {
+        // force writing a stripe
+        uint64_t offset = writer->writeIntermediateFooter();
+        offsets.push_back(offset);
+      }
+    }
+
+    // DO NOT CLOSE THE FILE INTENTIONALLY!
+    // writer->close();
+
+    auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
+    std::unique_ptr<Reader> reader =
+        createReaderWithTailLocation(pool, std::move(inStream), offsets.back());
+    std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+    EXPECT_EQ(8, reader->getNumberOfStripes());
+    EXPECT_EQ(8 * 65535, reader->getNumberOfRows());
+
+    batch = rowReader->createRowBatch(65535);
+    for (uint64_t j = 0; j < 8; ++j) {
+      EXPECT_TRUE(rowReader->next(*batch));
+      EXPECT_EQ(65535, batch->numElements);
+
+      for (uint64_t i = 0; i < 65535; ++i) {
+        structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
+        longBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
+        EXPECT_EQ(i, longBatch->data[i]);
+      }
+    }
+    EXPECT_FALSE(rowReader->next(*batch));
+  }
+
   INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest,
                            Values(FileVersion::v_0_11(), FileVersion::v_0_12(),
                                   FileVersion::UNSTABLE_PRE_2_0()));