You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2022/08/22 04:05:03 UTC

[orc] branch main updated: ORC-1252:[C++] Expose IO metrics for write operation

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new a49f87d49 ORC-1252:[C++] Expose IO metrics for write operation
a49f87d49 is described below

commit a49f87d492aa62ec2be2d4bce2fcfe1f53ca05d9
Author: coderex2522 <re...@gmail.com>
AuthorDate: Sun Aug 21 21:04:53 2022 -0700

    ORC-1252:[C++] Expose IO metrics for write operation
    
    ### What changes were proposed in this pull request?
    The pull request will add IO metrics for the write operation. The csv-import tool can display relevant IO metrics.
    
    ### Why are the changes needed?
    This patch can expose the metrics information related to the process of writing ORC files.
    
    ### How was this patch tested?
    Test IO metrics in BufferedOutputStream's UTs. And the csv-import tool with option -m  can be used for testing.
    
    Closes #1223 from coderex2522/ORC-1252.
    
    Authored-by: coderex2522 <re...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 c++/include/orc/Writer.hh            | 21 +++++++++++
 c++/src/ColumnWriter.cc              |  3 +-
 c++/src/Compression.cc               | 68 ++++++++++++++++++++++++------------
 c++/src/Compression.hh               |  3 +-
 c++/src/Writer.cc                    | 25 +++++++++++--
 c++/src/io/OutputStream.cc           | 12 +++++--
 c++/src/io/OutputStream.hh           | 11 +++++-
 c++/test/TestBufferedOutputStream.cc | 18 ++++++++--
 c++/test/TestByteRLEEncoder.cc       | 20 +++++------
 c++/test/TestByteRle.cc              |  6 ++--
 c++/test/TestCompression.cc          | 16 ++++++---
 c++/test/TestRleEncoder.cc           |  3 +-
 tools/src/CSVFileImport.cc           | 17 ++++++++-
 13 files changed, 170 insertions(+), 53 deletions(-)

diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 78b0b97d2..1a95572e7 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -24,6 +24,7 @@
 #include "orc/Type.hh"
 #include "orc/Vector.hh"
 
+#include <atomic>
 #include <memory>
 #include <set>
 #include <string>
@@ -46,6 +47,15 @@ namespace orc {
 
   class Timezone;
 
+  /**
+   * Expose the IO metrics for write operation.
+   */
+  struct WriterMetrics {
+    // Record the number of IO requests written to the output file
+    std::atomic<uint64_t> IOCount{0};
+    // Record the lantency of IO blocking
+    std::atomic<uint64_t> IOBlockingLatencyUs{0};
+  };
   /**
    * Options for creating a Writer.
    */
@@ -235,6 +245,17 @@ namespace orc {
      * @param zone writer timezone name
      */
     WriterOptions& setTimezoneName(const std::string& zone);
+
+    /**
+     * Set the writer metrics.
+     */
+    WriterOptions& setWriterMetrics(WriterMetrics * metrics);
+
+    /**
+     * Get the writer metrics.
+     * @return if not set, return nullptr.
+     */
+    WriterMetrics * getWriterMetrics() const;
   };
 
   class Writer {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 32b68af34..c8375d317 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -58,7 +58,8 @@ namespace orc {
                             // BufferedOutputStream initial capacity
                             1 * 1024 * 1024,
                             options.getCompressionBlockSize(),
-                            *options.getMemoryPool());
+                            *options.getMemoryPool(),
+                            options.getWriterMetrics());
   }
 
   std::unique_ptr<StreamsFactory> createStreamsFactory(
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 0b4969f41..81c6fb9e3 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -54,7 +54,8 @@ namespace orc {
                           int compressionLevel,
                           uint64_t capacity,
                           uint64_t blockSize,
-                          MemoryPool& pool);
+                          MemoryPool& pool,
+                          WriterMetrics* metrics);
 
     virtual bool Next(void** data, int*size) override = 0;
     virtual void BackUp(int count) override;
@@ -98,11 +99,13 @@ namespace orc {
                                                int compressionLevel,
                                                uint64_t capacity,
                                                uint64_t blockSize,
-                                               MemoryPool& pool) :
+                                               MemoryPool& pool,
+                                               WriterMetrics* metrics) :
                                                 BufferedOutputStream(pool,
                                                                      outStream,
                                                                      capacity,
-                                                                     blockSize),
+                                                                     blockSize,
+                                                                     metrics),
                                                 rawInputBuffer(pool, blockSize),
                                                 level(compressionLevel),
                                                 outputBuffer(nullptr),
@@ -160,7 +163,8 @@ namespace orc {
                           int compressionLevel,
                           uint64_t capacity,
                           uint64_t blockSize,
-                          MemoryPool& pool);
+                          MemoryPool& pool,
+                          WriterMetrics* metrics);
 
     virtual bool Next(void** data, int*size) override;
     virtual std::string getName() const override = 0;
@@ -174,12 +178,14 @@ namespace orc {
                                        int compressionLevel,
                                        uint64_t capacity,
                                        uint64_t blockSize,
-                                       MemoryPool& pool) :
+                                       MemoryPool& pool,
+                                       WriterMetrics* metrics) :
                                          CompressionStreamBase(outStream,
                                                                compressionLevel,
                                                                capacity,
                                                                blockSize,
-                                                               pool) {
+                                                               pool,
+                                                               metrics) {
     // PASS
   }
 
@@ -219,7 +225,8 @@ namespace orc {
                           int compressionLevel,
                           uint64_t capacity,
                           uint64_t blockSize,
-                          MemoryPool& pool);
+                          MemoryPool& pool,
+                          WriterMetrics* metrics);
 
     virtual ~ZlibCompressionStream() override {
       end();
@@ -241,12 +248,14 @@ namespace orc {
                         int compressionLevel,
                         uint64_t capacity,
                         uint64_t blockSize,
-                        MemoryPool& pool)
+                        MemoryPool& pool,
+                        WriterMetrics* metrics)
                         : CompressionStream(outStream,
                                             compressionLevel,
                                             capacity,
                                             blockSize,
-                                            pool) {
+                                            pool,
+                                            metrics) {
     init();
   }
 
@@ -934,12 +943,14 @@ DIAGNOSTIC_POP
                            int compressionLevel,
                            uint64_t capacity,
                            uint64_t blockSize,
-                           MemoryPool& pool)
+                           MemoryPool& pool,
+                           WriterMetrics* metrics)
                            : CompressionStreamBase(outStream,
                                                    compressionLevel,
                                                    capacity,
                                                    blockSize,
-                                                   pool)
+                                                   pool,
+                                                   metrics)
                            , compressorBuffer(pool) {
       // PASS
     }
@@ -1022,12 +1033,14 @@ DIAGNOSTIC_POP
                         int compressionLevel,
                         uint64_t capacity,
                         uint64_t blockSize,
-                        MemoryPool& pool)
+                        MemoryPool& pool,
+                        WriterMetrics* metrics)
                         : BlockCompressionStream(outStream,
                                                  compressionLevel,
                                                  capacity,
                                                  blockSize,
-                                                 pool) {
+                                                 pool,
+                                                 metrics) {
       this->init();
     }
 
@@ -1086,12 +1099,14 @@ DIAGNOSTIC_POP
                         int compressionLevel,
                         uint64_t capacity,
                         uint64_t blockSize,
-                        MemoryPool& pool)
+                        MemoryPool& pool,
+                        WriterMetrics* metrics)
                         : BlockCompressionStream(outStream,
                                                  compressionLevel,
                                                  capacity,
                                                  blockSize,
-                                                 pool) {
+                                                 pool,
+                                                 metrics) {
     }
 
     virtual std::string getName() const override {
@@ -1129,12 +1144,14 @@ DIAGNOSTIC_POP
                           int compressionLevel,
                           uint64_t capacity,
                           uint64_t blockSize,
-                          MemoryPool& pool)
+                          MemoryPool& pool,
+                          WriterMetrics* metrics)
                           : BlockCompressionStream(outStream,
                                                    compressionLevel,
                                                    capacity,
                                                    blockSize,
-                                                   pool) {
+                                                   pool,
+                                                   metrics) {
       this->init();
     }
 
@@ -1268,39 +1285,44 @@ DIAGNOSTIC_PUSH
                       CompressionStrategy strategy,
                       uint64_t bufferCapacity,
                       uint64_t compressionBlockSize,
-                      MemoryPool& pool) {
+                      MemoryPool& pool,
+                      WriterMetrics* metrics) {
     switch (static_cast<int64_t>(kind)) {
     case CompressionKind_NONE: {
       return std::unique_ptr<BufferedOutputStream>
         (new BufferedOutputStream(
-                pool, outStream, bufferCapacity, compressionBlockSize));
+          pool, outStream, bufferCapacity, compressionBlockSize, metrics));
     }
     case CompressionKind_ZLIB: {
       int level = (strategy == CompressionStrategy_SPEED) ?
               Z_BEST_SPEED + 1 : Z_DEFAULT_COMPRESSION;
       return std::unique_ptr<BufferedOutputStream>
         (new ZlibCompressionStream(
-                outStream, level, bufferCapacity, compressionBlockSize, pool));
+          outStream, level, bufferCapacity,
+          compressionBlockSize, pool, metrics));
     }
     case CompressionKind_ZSTD: {
       int level = (strategy == CompressionStrategy_SPEED) ?
               1 : ZSTD_CLEVEL_DEFAULT;
       return std::unique_ptr<BufferedOutputStream>
         (new ZSTDCompressionStream(
-          outStream, level, bufferCapacity, compressionBlockSize, pool));
+          outStream, level, bufferCapacity,
+          compressionBlockSize, pool, metrics));
     }
     case CompressionKind_LZ4: {
       int level = (strategy == CompressionStrategy_SPEED) ?
               LZ4_ACCELERATION_MAX : LZ4_ACCELERATION_DEFAULT;
       return std::unique_ptr<BufferedOutputStream>
         (new Lz4CompressionSteam(
-          outStream, level, bufferCapacity, compressionBlockSize, pool));
+          outStream, level, bufferCapacity,
+          compressionBlockSize, pool, metrics));
     }
     case CompressionKind_SNAPPY: {
       int level = 0;
       return std::unique_ptr<BufferedOutputStream>
         (new SnappyCompressionStream(
-          outStream, level, bufferCapacity, compressionBlockSize, pool));
+          outStream, level, bufferCapacity,
+          compressionBlockSize, pool, metrics));
     }
     case CompressionKind_LZO:
     default:
diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh
index 6457ad98c..50a252443 100644
--- a/c++/src/Compression.hh
+++ b/c++/src/Compression.hh
@@ -54,7 +54,8 @@ namespace orc {
                       CompressionStrategy strategy,
                       uint64_t bufferCapacity,
                       uint64_t compressionBlockSize,
-                      MemoryPool& pool);
+                      MemoryPool& pool,
+                      WriterMetrics* metrics);
 }
 
 #endif
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 05adb4744..ca7003f8e 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -21,6 +21,7 @@
 
 #include "ColumnWriter.hh"
 #include "Timezone.hh"
+#include "Utils.hh"
 
 #include <memory>
 
@@ -42,6 +43,7 @@ namespace orc {
     double bloomFilterFalsePositiveProb;
     BloomFilterVersion bloomFilterVersion;
     std::string timezone;
+    WriterMetrics* metrics;
 
     WriterOptionsPrivate() :
                             fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12
@@ -61,6 +63,7 @@ namespace orc {
       //introduced by moving timestamps between different timezones.
       //Explictly set the writer timezone if the use case depends on it.
       timezone = "GMT";
+      metrics = nullptr;
     }
   };
 
@@ -255,6 +258,15 @@ namespace orc {
     return *this;
   }
 
+  WriterMetrics* WriterOptions::getWriterMetrics() const {
+    return privateBits->metrics;
+  }
+
+  WriterOptions& WriterOptions::setWriterMetrics(WriterMetrics* metrics) {
+    privateBits->metrics = metrics;
+    return *this;
+  }
+
   Writer::~Writer() {
     // PASS
   }
@@ -328,14 +340,16 @@ namespace orc {
                                   options.getCompressionStrategy(),
                                   1 * 1024 * 1024, // buffer capacity: 1M
                                   options.getCompressionBlockSize(),
-                                  *options.getMemoryPool());
+                                  *options.getMemoryPool(),
+                                  options.getWriterMetrics());
 
     // uncompressed stream for post script
     bufferedStream.reset(new BufferedOutputStream(
                                             *options.getMemoryPool(),
                                             outStream,
                                             1024, // buffer capacity: 1024 bytes
-                                            options.getCompressionBlockSize()));
+                                            options.getCompressionBlockSize(),
+                                            options.getWriterMetrics()));
 
     init();
   }
@@ -393,7 +407,11 @@ namespace orc {
   void WriterImpl::init() {
     // Write file header
     const static size_t magicIdLength = strlen(WriterImpl::magicId);
-    outStream->write(WriterImpl::magicId, magicIdLength);
+    {
+      SCOPED_STOPWATCH(
+        options.getWriterMetrics(), IOBlockingLatencyUs, IOCount);
+      outStream->write(WriterImpl::magicId, magicIdLength);
+    }
     currentOffset += magicIdLength;
 
     // Initialize file footer
@@ -541,6 +559,7 @@ namespace orc {
     }
     unsigned char psLength =
                       static_cast<unsigned char>(bufferedStream->flush());
+    SCOPED_STOPWATCH(options.getWriterMetrics(), IOBlockingLatencyUs, IOCount);
     outStream->write(&psLength, sizeof(unsigned char));
   }
 
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index 8770d286d..4485e1b29 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -18,6 +18,7 @@
 
 #include "orc/Exceptions.hh"
 #include "OutputStream.hh"
+#include "Utils.hh"
 
 #include <sstream>
 
@@ -31,9 +32,11 @@ namespace orc {
                                     MemoryPool& pool,
                                     OutputStream * outStream,
                                     uint64_t capacity_,
-                                    uint64_t blockSize_)
+                                    uint64_t blockSize_,
+                                    WriterMetrics* metrics_)
                                     : outputStream(outStream),
-                                      blockSize(blockSize_) {
+                                      blockSize(blockSize_),
+                                      metrics(metrics_) {
     dataBuffer.reset(new DataBuffer<char>(pool));
     dataBuffer->reserve(capacity_);
   }
@@ -92,7 +95,10 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
-    outputStream->write(dataBuffer->data(), dataSize);
+    {
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
+      outputStream->write(dataBuffer->data(), dataSize);
+    }
     dataBuffer->resize(0);
     return dataSize;
   }
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index 69c06d5f6..c49b769d5 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -34,6 +34,12 @@ namespace orc {
     virtual void add(uint64_t pos) = 0;
   };
 
+DIAGNOSTIC_PUSH
+
+#ifdef __clang__
+  DIAGNOSTIC_IGNORE("-Wunused-private-field")
+#endif
+  struct WriterMetrics;
   /**
    * A subclass of Google's ZeroCopyOutputStream that supports output to memory
    * buffer, and flushing to OutputStream.
@@ -45,12 +51,14 @@ namespace orc {
     OutputStream * outputStream;
     std::unique_ptr<DataBuffer<char> > dataBuffer;
     uint64_t blockSize;
+    WriterMetrics* metrics;
 
   public:
     BufferedOutputStream(MemoryPool& pool,
                       OutputStream * outStream,
                       uint64_t capacity,
-                      uint64_t block_size);
+                      uint64_t block_size,
+                      WriterMetrics* metrics);
     virtual ~BufferedOutputStream() override;
 
     virtual bool Next(void** data, int*size) override;
@@ -66,6 +74,7 @@ namespace orc {
 
     virtual bool isCompressed() const { return false; }
   };
+DIAGNOSTIC_POP
 
   /**
    * An append only buffered stream that allows
diff --git a/c++/test/TestBufferedOutputStream.cc b/c++/test/TestBufferedOutputStream.cc
index b47e1e54f..448194035 100644
--- a/c++/test/TestBufferedOutputStream.cc
+++ b/c++/test/TestBufferedOutputStream.cc
@@ -28,7 +28,9 @@ namespace orc {
 
     uint64_t capacity = 1000;
     uint64_t block = 10;
-    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    WriterMetrics metrics;
+    BufferedOutputStream bufStream(
+            *pool, &memStream, capacity, block, &metrics);
     for (int i = 0; i < 100; ++i) {
       char * buf;
       int len;
@@ -44,6 +46,9 @@ namespace orc {
     for (int i = 0; i < 1000; ++i) {
       EXPECT_EQ(memStream.getData()[i], 'a' + i % 10);
     }
+#if ENABLE_METRICS
+    EXPECT_EQ(metrics.IOCount.load(), 1);
+#endif
   }
 
   TEST(BufferedOutputStream, block_not_aligned) {
@@ -52,7 +57,9 @@ namespace orc {
 
     uint64_t capacity = 20;
     uint64_t block = 10;
-    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    WriterMetrics metrics;
+    BufferedOutputStream bufStream(
+            *pool, &memStream, capacity, block, &metrics);
 
     char * buf;
     int len;
@@ -89,6 +96,9 @@ namespace orc {
     for (int i = 0; i < 5; ++i) {
      EXPECT_EQ(memStream.getData()[i + 7], 'a' + i);
     }
+#if ENABLE_METRICS
+    EXPECT_EQ(metrics.IOCount.load(), 2);
+#endif
   }
 
   TEST(BufferedOutputStream, protobuff_serialization) {
@@ -97,7 +107,9 @@ namespace orc {
 
     uint64_t capacity = 20;
     uint64_t block = 10;
-    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    WriterMetrics metrics;
+    BufferedOutputStream bufStream(
+            *pool, &memStream, capacity, block, &metrics);
 
     proto::PostScript ps;
     ps.set_footerlength(197934);
diff --git a/c++/test/TestByteRLEEncoder.cc b/c++/test/TestByteRLEEncoder.cc
index 539cfad15..1cfd16984 100644
--- a/c++/test/TestByteRLEEncoder.cc
+++ b/c++/test/TestByteRLEEncoder.cc
@@ -119,10 +119,10 @@ namespace orc {
 
     uint64_t capacity = 500 * 1024;
     uint64_t block = 1024;
-    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
 
     std::unique_ptr<BufferedOutputStream> outStream(
-        new BufferedOutputStream(*pool, &memStream, capacity, block));
+        new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
 
     std::unique_ptr<ByteRleEncoder> encoder =
       createByteRleEncoder(std::move(outStream));
@@ -142,10 +142,10 @@ namespace orc {
 
     uint64_t capacity = 500 * 1024;
     uint64_t block = 1024;
-    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
 
     std::unique_ptr<BufferedOutputStream> outStream(
-        new BufferedOutputStream(*pool, &memStream, capacity, block));
+        new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
 
     std::unique_ptr<ByteRleEncoder> encoder =
       createByteRleEncoder(std::move(outStream));
@@ -167,10 +167,10 @@ namespace orc {
 
     uint64_t capacity = 500 * 1024;
     uint64_t block = 1024;
-    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
 
     std::unique_ptr<BufferedOutputStream> outStream(
-        new BufferedOutputStream(*pool, &memStream, capacity, block));
+        new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
 
     std::unique_ptr<ByteRleEncoder> encoder =
       createBooleanRleEncoder(std::move(outStream));
@@ -190,10 +190,10 @@ namespace orc {
 
     uint64_t capacity = 500 * 1024;
     uint64_t block = 1024;
-    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
 
     std::unique_ptr<BufferedOutputStream> outStream(
-        new BufferedOutputStream(*pool, &memStream, capacity, block));
+        new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
 
     std::unique_ptr<ByteRleEncoder> encoder =
       createBooleanRleEncoder(std::move(outStream));
@@ -213,10 +213,10 @@ namespace orc {
 
     uint64_t capacity = 500 * 1024;
     uint64_t block = 1024;
-    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
 
     std::unique_ptr<BufferedOutputStream> outStream(
-        new BufferedOutputStream(*pool, &memStream, capacity, block));
+        new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
 
     std::unique_ptr<ByteRleEncoder> encoder =
       createBooleanRleEncoder(std::move(outStream));
diff --git a/c++/test/TestByteRle.cc b/c++/test/TestByteRle.cc
index 52b0e984d..d2715f34e 100644
--- a/c++/test/TestByteRle.cc
+++ b/c++/test/TestByteRle.cc
@@ -1449,7 +1449,8 @@ TEST(BooleanRle, seekBoolAndByteRLE) {
                         CompressionStrategy_COMPRESSION,
                         capacity,
                         blockSize,
-                        *getDefaultPool()));
+                        *getDefaultPool(),
+                        nullptr));
     encoder->add(data, numValues, nullptr);
     encoder->flush();
 
@@ -1482,7 +1483,8 @@ TEST(BooleanRle, seekBoolAndByteRLE) {
     uint64_t capacity = 500 * 1024;
     uint64_t block = 1024;
     std::unique_ptr<BufferedOutputStream> outStream(
-            new BufferedOutputStream(*getDefaultPool(), &memStream, capacity, block));
+            new BufferedOutputStream(
+                    *getDefaultPool(), &memStream, capacity, block, nullptr));
 
     std::unique_ptr<ByteRleEncoder> encoder =
             createBooleanRleEncoder(std::move(outStream));
diff --git a/c++/test/TestCompression.cc b/c++/test/TestCompression.cc
index 17d569f2c..0ecebf429 100644
--- a/c++/test/TestCompression.cc
+++ b/c++/test/TestCompression.cc
@@ -85,7 +85,8 @@ namespace orc {
                        strategy,
                        capacity,
                        block,
-                       pool);
+                       pool,
+                       nullptr);
 
     size_t pos = 0;
     char * compressBuffer;
@@ -284,7 +285,8 @@ namespace orc {
                        CompressionStrategy_SPEED,
                        capacity,
                        block,
-                       *pool);
+                       *pool,
+                       nullptr);
 
     EXPECT_TRUE(ps.SerializeToZeroCopyStream(compressStream.get()));
     compressStream->flush();
@@ -394,8 +396,14 @@ namespace orc {
     CompressionStrategy strategy = CompressionStrategy_COMPRESSION;
     uint64_t batchSize = 1024, blockSize = 256;
 
-    AppendOnlyBufferedStream outStream(createCompressor(
-      kind, &memStream, strategy, DEFAULT_MEM_STREAM_SIZE, blockSize, *pool));
+    AppendOnlyBufferedStream outStream(
+      createCompressor(kind,
+                       &memStream,
+                       strategy,
+                       DEFAULT_MEM_STREAM_SIZE,
+                       blockSize,
+                       *pool,
+                       nullptr));
 
     // write 3 batches of data and record positions between every batch
     size_t row = 0;
diff --git a/c++/test/TestRleEncoder.cc b/c++/test/TestRleEncoder.cc
index 73be2f46a..3ed6e58e3 100644
--- a/c++/test/TestRleEncoder.cc
+++ b/c++/test/TestRleEncoder.cc
@@ -126,7 +126,8 @@ namespace orc {
 
     return createRleEncoder(
             std::unique_ptr<BufferedOutputStream>(
-                    new BufferedOutputStream(*pool, &memStream, 500 * 1024, 1024)),
+                    new BufferedOutputStream(
+                            *pool, &memStream, 500 * 1024, 1024, nullptr)),
             isSigned, version, *pool, alignBitpacking);
   }
 
diff --git a/tools/src/CSVFileImport.cc b/tools/src/CSVFileImport.cc
index 280491b3b..ae39ecdd5 100644
--- a/tools/src/CSVFileImport.cc
+++ b/tools/src/CSVFileImport.cc
@@ -277,6 +277,7 @@ void fillTimestampValues(const std::vector<std::string>& data,
 
 void usage() {
   std::cout << "Usage: csv-import [-h] [--help]\n"
+            << "                  [-m] [--metrics]\n"
             << "                  [-d <character>] [--delimiter=<character>]\n"
             << "                  [-s <size>] [--stripe=<size>]\n"
             << "                  [-c <size>] [--block=<size>]\n"
@@ -300,6 +301,7 @@ int main(int argc, char* argv[]) {
 
   static struct option longOptions[] = {
     {"help", no_argument, ORC_NULLPTR, 'h'},
+    {"metrics", no_argument, ORC_NULLPTR, 'm'},
     {"delimiter", required_argument, ORC_NULLPTR, 'd'},
     {"stripe", required_argument, ORC_NULLPTR, 's'},
     {"block", required_argument, ORC_NULLPTR, 'c'},
@@ -308,15 +310,19 @@ int main(int argc, char* argv[]) {
     {ORC_NULLPTR, 0, ORC_NULLPTR, 0}
   };
   bool helpFlag = false;
+  bool showMetrics = false;
   int opt;
   char *tail;
   do {
-    opt = getopt_long(argc, argv, "d:s:c:b:t:h", longOptions, ORC_NULLPTR);
+    opt = getopt_long(argc, argv, "d:s:c:b:t:mh", longOptions, ORC_NULLPTR);
     switch (opt) {
       case 'h':
         helpFlag = true;
         opt = -1;
         break;
+      case 'm':
+        showMetrics = true;
+        break;
       case 'd':
         gDelimiter = optarg[0];
         break;
@@ -369,10 +375,12 @@ int main(int argc, char* argv[]) {
   DataBufferList bufferList;
 
   orc::WriterOptions options;
+  orc::WriterMetrics metrics;
   options.setStripeSize(stripeSize);
   options.setCompressionBlockSize(blockSize);
   options.setCompression(compression);
   options.setTimezoneName(timezoneName);
+  options.setWriterMetrics(showMetrics ? &metrics : nullptr);
 
   ORC_UNIQUE_PTR<orc::OutputStream> outStream = orc::writeLocalFile(output);
   ORC_UNIQUE_PTR<orc::Writer> writer =
@@ -502,5 +510,12 @@ int main(int argc, char* argv[]) {
   std::cout << GetDate() << " Total writer CPU time: "
             << static_cast<double>(totalCPUTime) / CLOCKS_PER_SEC
             << "s." << std::endl;
+  if (showMetrics) {
+    std::cout << GetDate() << " IO block lantency: "
+              << static_cast<double>(metrics.IOBlockingLatencyUs) / 1000000.0
+              << "s." << std::endl;
+    std::cout << GetDate() << " IO count: "
+              << metrics.IOCount << std::endl;
+  }
   return 0;
 }