You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2022/07/12 17:24:28 UTC

[orc] branch main updated: ORC-961: [C++] expose related metrics of the reader

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new df8ed0324 ORC-961: [C++] expose related metrics of the reader
df8ed0324 is described below

commit df8ed032440114ca63e4d4fd4a0d3e222a15757f
Author: coderex2522 <re...@gmail.com>
AuthorDate: Tue Jul 12 10:24:11 2022 -0700

    ORC-961: [C++] expose related metrics of the reader
    
    ### What changes were proposed in this pull request?
    This patch keeps track of the time spent and the number of calls to each module. The current metrics mainly include the decompression time and number of calls, decoding time and number of calls and total elapsed time.
    
    ### Why are the changes needed?
    It exposes the relevant metrics of the reader, so that the user can visually see the metrics of each module which includes decompression, decoding.
    
    ### How was this patch tested?
    The orc-scan tool with the -m parameter can output relevant metrics.
    
    Closes #1158 from coderex2522/ORC-961.
    
    Authored-by: coderex2522 <re...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 c++/include/orc/OrcFile.hh            | 12 ++++--
 c++/include/orc/Reader.hh             | 41 ++++++++++++++++++
 c++/src/ByteRLE.cc                    | 41 +++++++++++++-----
 c++/src/ByteRLE.hh                    |  8 +++-
 c++/src/ColumnReader.cc               | 41 +++++++++++-------
 c++/src/ColumnReader.hh               |  6 +++
 c++/src/Compression.cc                | 67 +++++++++++++++++++----------
 c++/src/Compression.hh                |  4 +-
 c++/src/Options.hh                    | 11 +++++
 c++/src/OrcFile.cc                    | 21 ++++++----
 c++/src/OrcHdfsFile.cc                | 13 ++++--
 c++/src/RLE.cc                        |  9 ++--
 c++/src/RLE.hh                        | 10 ++++-
 c++/src/RLEv1.cc                      |  9 +++-
 c++/src/RLEv1.hh                      |  2 +-
 c++/src/RLEv2.hh                      |  3 +-
 c++/src/Reader.cc                     | 49 ++++++++++++++++------
 c++/src/Reader.hh                     |  8 +++-
 c++/src/RleDecoderV2.cc               |  9 +++-
 c++/src/StripeStream.cc               | 10 ++++-
 c++/src/StripeStream.hh               |  9 +++-
 c++/src/Utils.hh                      | 75 +++++++++++++++++++++++++++++++++
 c++/src/sargs/SargsApplier.cc         | 31 ++++++++++----
 c++/src/sargs/SargsApplier.hh         | 30 ++++++++++---
 c++/test/TestAttributes.cc            |  5 ++-
 c++/test/TestByteRLEEncoder.cc        |  4 +-
 c++/test/TestByteRle.cc               | 55 ++++++++++++++----------
 c++/test/TestColumnReader.cc          |  4 ++
 c++/test/TestCompression.cc           |  9 ++--
 c++/test/TestDecompression.cc         | 55 +++++++++++++++---------
 c++/test/TestReader.cc                |  6 ++-
 c++/test/TestRleDecoder.cc            | 48 ++++++++++++++-------
 c++/test/TestRleEncoder.cc            |  3 +-
 c++/test/TestSargsApplier.cc          | 46 +++++++++++++++-----
 c++/test/TestStripeIndexStatistics.cc |  6 ++-
 c++/test/TestTimestampStatistics.cc   |  5 ++-
 tools/src/FileContents.cc             |  7 +++-
 tools/src/FileMemory.cc               |  7 +++-
 tools/src/FileMetadata.cc             |  8 +++-
 tools/src/FileScan.cc                 | 15 +++++--
 tools/src/FileStatistics.cc           |  5 ++-
 tools/src/ToolsHelper.cc              | 38 +++++++++++++++--
 tools/src/ToolsHelper.hh              |  6 ++-
 tools/test/TestFileScan.cc            |  6 +++
 tools/test/TestMatch.cc               | 79 +++++++++++++++++++++++++----------
 45 files changed, 710 insertions(+), 226 deletions(-)

diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index c64853168..1ae7725f2 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -105,20 +105,26 @@ namespace orc {
   /**
    * Create a stream to a local file or HDFS file if path begins with "hdfs://"
    * @param path the name of the file in the local file system or HDFS
+   * @param metrics the metrics of the reader
    */
-  ORC_UNIQUE_PTR<InputStream> readFile(const std::string& path);
+  ORC_UNIQUE_PTR<InputStream> readFile(const std::string& path,
+                                       ReaderMetrics* metrics = nullptr);
 
   /**
    * Create a stream to a local file.
    * @param path the name of the file in the local file system
+   * @param metrics the metrics of the reader
    */
-  ORC_UNIQUE_PTR<InputStream> readLocalFile(const std::string& path);
+  ORC_UNIQUE_PTR<InputStream> readLocalFile(const std::string& path,
+                                            ReaderMetrics* metrics = nullptr);
 
   /**
    * Create a stream to an HDFS file.
    * @param path the uri of the file in HDFS
+   * @param metrics the metrics of the reader
    */
-  ORC_UNIQUE_PTR<InputStream> readHdfsFile(const std::string& path);
+  ORC_UNIQUE_PTR<InputStream> readHdfsFile(const std::string& path,
+                                           ReaderMetrics* metrics = nullptr);
 
   /**
    * Create a reader to read the ORC file.
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index ddc8b5505..1bfb8bea0 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -32,6 +32,7 @@
 #include <set>
 #include <string>
 #include <vector>
+#include <atomic>
 
 namespace orc {
 
@@ -39,6 +40,28 @@ namespace orc {
   struct ReaderOptionsPrivate;
   struct RowReaderOptionsPrivate;
 
+  /**
+   * Expose the reader metrics including the latency and
+   * number of calls of the decompression/decoding/IO modules.
+   */
+  struct ReaderMetrics {
+    std::atomic<uint64_t> ReaderCall{0};
+    // ReaderInclusiveLatencyUs contains the latency of
+    // the decompression/decoding/IO modules.
+    std::atomic<uint64_t> ReaderInclusiveLatencyUs{0};
+    std::atomic<uint64_t> DecompressionCall{0};
+    std::atomic<uint64_t> DecompressionLatencyUs{0};
+    std::atomic<uint64_t> DecodingCall{0};
+    std::atomic<uint64_t> DecodingLatencyUs{0};
+    std::atomic<uint64_t> ByteDecodingCall{0};
+    std::atomic<uint64_t> ByteDecodingLatencyUs{0};
+    std::atomic<uint64_t> IOCount{0};
+    std::atomic<uint64_t> IOBlockingLatencyUs{0};
+    std::atomic<uint64_t> SelectedRowGroupCount{0};
+    std::atomic<uint64_t> EvaluatedRowGroupCount{0};
+  };
+  ReaderMetrics* getDefaultReaderMetrics();
+
   /**
    * Options for creating a Reader.
    */
@@ -76,6 +99,13 @@ namespace orc {
      */
     ReaderOptions& setMemoryPool(MemoryPool& pool);
 
+    /**
+     * Set the reader metrics.
+     *
+     * When set to nullptr, the reader metrics will be disabled.
+     */
+    ReaderOptions& setReaderMetrics(ReaderMetrics* metrics);
+
     /**
      * Set the location of the tail as defined by the logical length of the
      * file.
@@ -102,6 +132,11 @@ namespace orc {
      * Get the memory allocator.
      */
     MemoryPool* getMemoryPool() const;
+
+    /**
+     * Get the reader metrics.
+     */
+    ReaderMetrics* getReaderMetrics() const;
   };
 
   /**
@@ -455,6 +490,12 @@ namespace orc {
      */
     virtual bool hasCorrectStatistics() const = 0;
 
+    /**
+     * Get metrics of the reader
+     * @return the accumulated reader metrics to current state.
+     */
+    virtual const ReaderMetrics* getReaderMetrics() const = 0;
+
     /**
      * Get the serialized file tail.
      * Usefull if another reader of the same file wants to avoid re-reading
diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index 1c4a64516..e3a265dd6 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -22,6 +22,7 @@
 #include <utility>
 
 #include "ByteRLE.hh"
+#include "Utils.hh"
 #include "orc/Exceptions.hh"
 
 namespace orc {
@@ -318,7 +319,8 @@ namespace orc {
 
   class ByteRleDecoderImpl: public ByteRleDecoder {
   public:
-    ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
+    ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream> input,
+                       ReaderMetrics* metrics);
 
     virtual ~ByteRleDecoderImpl();
 
@@ -338,6 +340,7 @@ namespace orc {
     virtual void next(char* data, uint64_t numValues, char* notNull);
 
   protected:
+    void nextInternal(char* data, uint64_t numValues, char* notNull);
     inline void nextBuffer();
     inline signed char readByte();
     inline void readHeader();
@@ -348,9 +351,11 @@ namespace orc {
     const char* bufferStart;
     const char* bufferEnd;
     bool repeating;
+    ReaderMetrics* metrics;
   };
 
   void ByteRleDecoderImpl::nextBuffer() {
+    SCOPED_MINUS_STOPWATCH(metrics, ByteDecodingLatencyUs);
     int bufferLength;
     const void* bufferPointer;
     bool result = inputStream->Next(&bufferPointer, &bufferLength);
@@ -380,8 +385,10 @@ namespace orc {
     }
   }
 
-  ByteRleDecoderImpl::ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream>
-                                         input) {
+  ByteRleDecoderImpl::ByteRleDecoderImpl(
+                        std::unique_ptr<SeekableInputStream> input,
+                        ReaderMetrics* _metrics)
+                        : metrics(_metrics) {
     inputStream = std::move(input);
     repeating = false;
     remainingValues = 0;
@@ -406,6 +413,7 @@ namespace orc {
   }
 
   void ByteRleDecoderImpl::skip(uint64_t numValues) {
+    SCOPED_STOPWATCH(metrics, ByteDecodingLatencyUs, ByteDecodingCall);
     while (numValues > 0) {
       if (remainingValues == 0) {
         readHeader();
@@ -433,6 +441,12 @@ namespace orc {
 
   void ByteRleDecoderImpl::next(char* data, uint64_t numValues,
                                 char* notNull) {
+    SCOPED_STOPWATCH(metrics, ByteDecodingLatencyUs, ByteDecodingCall);
+    nextInternal(data, numValues, notNull);
+  }
+
+  void ByteRleDecoderImpl::nextInternal(char* data, uint64_t numValues,
+                                        char* notNull) {
     uint64_t position = 0;
     // skip over null values
     while (notNull && position < numValues && !notNull[position]) {
@@ -493,14 +507,16 @@ namespace orc {
   }
 
   std::unique_ptr<ByteRleDecoder> createByteRleDecoder
-                                 (std::unique_ptr<SeekableInputStream> input) {
+                                 (std::unique_ptr<SeekableInputStream> input,
+                                  ReaderMetrics* metrics) {
     return std::unique_ptr<ByteRleDecoder>(new ByteRleDecoderImpl
-                                           (std::move(input)));
+                                           (std::move(input), metrics));
   }
 
   class BooleanRleDecoderImpl: public ByteRleDecoderImpl {
   public:
-    BooleanRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
+    BooleanRleDecoderImpl(std::unique_ptr<SeekableInputStream> input,
+                          ReaderMetrics* metrics);
 
     virtual ~BooleanRleDecoderImpl();
 
@@ -525,8 +541,9 @@ namespace orc {
   };
 
   BooleanRleDecoderImpl::BooleanRleDecoderImpl
-                                (std::unique_ptr<SeekableInputStream> input
-                                 ): ByteRleDecoderImpl(std::move(input)) {
+                                (std::unique_ptr<SeekableInputStream> input,
+                                 ReaderMetrics* _metrics
+                                 ): ByteRleDecoderImpl(std::move(input), _metrics) {
     remainingBits = 0;
     lastByte = 0;
   }
@@ -566,6 +583,7 @@ namespace orc {
 
   void BooleanRleDecoderImpl::next(char* data, uint64_t numValues,
                                    char* notNull) {
+    SCOPED_STOPWATCH(metrics, ByteDecodingLatencyUs, ByteDecodingCall);
     // next spot to fill in
     uint64_t position = 0;
 
@@ -607,7 +625,7 @@ namespace orc {
     } else if (position < numValues) {
       // read the new bytes into the array
       uint64_t bytesRead = (nonNulls + 7) / 8;
-      ByteRleDecoderImpl::next(data + position, bytesRead, nullptr);
+      ByteRleDecoderImpl::nextInternal(data + position, bytesRead, nullptr);
       lastByte = data[position + bytesRead - 1];
       remainingBits = bytesRead * 8 - nonNulls;
       // expand the array backwards so that we don't clobber the data
@@ -634,9 +652,10 @@ namespace orc {
   }
 
   std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder
-                                 (std::unique_ptr<SeekableInputStream> input) {
+                                 (std::unique_ptr<SeekableInputStream> input,
+                                  ReaderMetrics* metrics) {
     BooleanRleDecoderImpl* decoder =
-      new BooleanRleDecoderImpl(std::move(input));
+      new BooleanRleDecoderImpl(std::move(input), metrics);
     return std::unique_ptr<ByteRleDecoder>(
                                     reinterpret_cast<ByteRleDecoder*>(decoder));
   }
diff --git a/c++/src/ByteRLE.hh b/c++/src/ByteRLE.hh
index 2f6e2eb4d..f8aecc6c6 100644
--- a/c++/src/ByteRLE.hh
+++ b/c++/src/ByteRLE.hh
@@ -103,9 +103,11 @@ namespace orc {
   /**
    * Create a byte RLE decoder.
    * @param input the input stream to read from
+   * @param metrics the metrics of the decoder
    */
   std::unique_ptr<ByteRleDecoder> createByteRleDecoder
-                                 (std::unique_ptr<SeekableInputStream> input);
+                                 (std::unique_ptr<SeekableInputStream> input,
+                                  ReaderMetrics* metrics);
 
   /**
    * Create a boolean RLE decoder.
@@ -114,9 +116,11 @@ namespace orc {
    * if the value is masked by notNull. This is required for the notNull stream
    * processing to properly apply multiple masks from nested types.
    * @param input the input stream to read from
+   * @param metrics the metrics of the decoder
    */
   std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder
-                                 (std::unique_ptr<SeekableInputStream> input);
+                                 (std::unique_ptr<SeekableInputStream> input,
+                                  ReaderMetrics* metrics);
 }
 
 #endif
diff --git a/c++/src/ColumnReader.cc b/c++/src/ColumnReader.cc
index f4a4df924..5e2191f97 100644
--- a/c++/src/ColumnReader.cc
+++ b/c++/src/ColumnReader.cc
@@ -49,11 +49,12 @@ namespace orc {
   ColumnReader::ColumnReader(const Type& type,
                              StripeStreams& stripe
                              ): columnId(type.getColumnId()),
-                                memoryPool(stripe.getMemoryPool()) {
+                                memoryPool(stripe.getMemoryPool()),
+                                metrics(stripe.getReaderMetrics()) {
     std::unique_ptr<SeekableInputStream> stream =
       stripe.getStream(columnId, proto::Stream_Kind_PRESENT, true);
     if (stream.get()) {
-      notNullDecoder = createBooleanRleDecoder(std::move(stream));
+      notNullDecoder = createBooleanRleDecoder(std::move(stream), metrics);
     }
   }
 
@@ -160,7 +161,7 @@ namespace orc {
         stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
     if (stream == nullptr)
       throw ParseError("DATA stream not found in Boolean column");
-    rle = createBooleanRleDecoder(std::move(stream));
+    rle = createBooleanRleDecoder(std::move(stream), metrics);
   }
 
   BooleanColumnReader::~BooleanColumnReader() {
@@ -216,7 +217,7 @@ namespace orc {
         stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
     if (stream == nullptr)
       throw ParseError("DATA stream not found in Byte column");
-    rle = createByteRleDecoder(std::move(stream));
+    rle = createByteRleDecoder(std::move(stream), metrics);
   }
 
   ByteColumnReader::~ByteColumnReader() {
@@ -273,7 +274,8 @@ namespace orc {
         stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
     if (stream == nullptr)
       throw ParseError("DATA stream not found in Integer column");
-    rle = createRleDecoder(std::move(stream), true, vers, memoryPool);
+    rle = createRleDecoder(
+        std::move(stream), true, vers, memoryPool, metrics);
   }
 
   IntegerColumnReader::~IntegerColumnReader() {
@@ -343,11 +345,13 @@ namespace orc {
         stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
     if (stream == nullptr)
       throw ParseError("DATA stream not found in Timestamp column");
-    secondsRle = createRleDecoder(std::move(stream), true, vers, memoryPool);
+    secondsRle = createRleDecoder(
+        std::move(stream), true, vers, memoryPool, metrics);
     stream = stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
     if (stream == nullptr)
       throw ParseError("SECONDARY stream not found in Timestamp column");
-    nanoRle = createRleDecoder(std::move(stream), false, vers, memoryPool);
+    nanoRle = createRleDecoder(
+        std::move(stream), false, vers, memoryPool, metrics);
   }
 
   TimestampColumnReader::~TimestampColumnReader() {
@@ -636,13 +640,14 @@ namespace orc {
     if (stream == nullptr) {
       throw ParseError("DATA stream not found in StringDictionaryColumn");
     }
-    rle = createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
+    rle = createRleDecoder(
+        std::move(stream), false, rleVersion, memoryPool, metrics);
     stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
     if (dictSize > 0 && stream == nullptr) {
       throw ParseError("LENGTH stream not found in StringDictionaryColumn");
     }
-    std::unique_ptr<RleDecoder> lengthDecoder =
-        createRleDecoder(std::move(stream), false, rleVersion, memoryPool);
+    std::unique_ptr<RleDecoder> lengthDecoder = createRleDecoder(
+        std::move(stream), false, rleVersion, memoryPool, metrics);
     dictionary->dictionaryOffset.resize(dictSize + 1);
     int64_t* lengthArray = dictionary->dictionaryOffset.data();
     lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
@@ -775,7 +780,7 @@ namespace orc {
     if (stream == nullptr)
       throw ParseError("LENGTH stream not found in StringDirectColumn");
     lengthRle = createRleDecoder(
-        std::move(stream), false, rleVersion, memoryPool);
+        std::move(stream), false, rleVersion, memoryPool, metrics);
     blobStream = stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
     if (blobStream == nullptr)
       throw ParseError("DATA stream not found in StringDirectColumn");
@@ -1042,7 +1047,8 @@ namespace orc {
         stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
     if (stream == nullptr)
       throw ParseError("LENGTH stream not found in List column");
-    rle = createRleDecoder(std::move(stream), false, vers, memoryPool);
+    rle = createRleDecoder(
+        std::move(stream), false, vers, memoryPool, metrics);
     const Type& childType = *type.getSubtype(0);
     if (selectedColumns[static_cast<uint64_t>(childType.getColumnId())]) {
       child = buildReader(childType, stripe);
@@ -1175,7 +1181,8 @@ namespace orc {
         stripe.getStream(columnId, proto::Stream_Kind_LENGTH, true);
     if (stream == nullptr)
       throw ParseError("LENGTH stream not found in Map column");
-    rle = createRleDecoder(std::move(stream), false, vers, memoryPool);
+    rle = createRleDecoder(
+        std::move(stream), false, vers, memoryPool, metrics);
     const Type& keyType = *type.getSubtype(0);
     if (selectedColumns[static_cast<uint64_t>(keyType.getColumnId())]) {
       keyReader = buildReader(keyType, stripe);
@@ -1332,7 +1339,7 @@ namespace orc {
         stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
     if (stream == nullptr)
       throw ParseError("LENGTH stream not found in Union column");
-    rle = createByteRleDecoder(std::move(stream));
+    rle = createByteRleDecoder(std::move(stream), metrics);
     // figure out which types are selected
     const std::vector<bool> selectedColumns = stripe.getSelectedColumns();
     for(unsigned int i=0; i < numChildren; ++i) {
@@ -1547,7 +1554,8 @@ namespace orc {
         stripe.getStream(columnId, proto::Stream_Kind_SECONDARY, true);
     if (stream == nullptr)
       throw ParseError("SECONDARY stream not found in Decimal64Column");
-    scaleDecoder = createRleDecoder(std::move(stream), true, vers, memoryPool);
+    scaleDecoder = createRleDecoder(
+        std::move(stream), true, vers, memoryPool, metrics);
   }
 
   Decimal64ColumnReader::~Decimal64ColumnReader() {
@@ -1722,7 +1730,8 @@ namespace orc {
       ss << "DATA stream not found in Decimal64V2 column. ColumnId=" << columnId;
       throw ParseError(ss.str());
     }
-    valueDecoder = createRleDecoder(std::move(stream), true, RleVersion_2, memoryPool);
+    valueDecoder = createRleDecoder(
+        std::move(stream), true, RleVersion_2, memoryPool, metrics);
   }
 
   Decimal64ColumnReaderV2::~Decimal64ColumnReaderV2() {
diff --git a/c++/src/ColumnReader.hh b/c++/src/ColumnReader.hh
index 80b59de2c..67bd3ddeb 100644
--- a/c++/src/ColumnReader.hh
+++ b/c++/src/ColumnReader.hh
@@ -63,6 +63,11 @@ namespace orc {
      */
     virtual MemoryPool& getMemoryPool() const = 0;
 
+    /**
+     * Get the reader metrics for this reader.
+     */
+    virtual ReaderMetrics* getReaderMetrics() const = 0;
+
     /**
      * Get the writer's timezone, so that we can convert their dates correctly.
      */
@@ -107,6 +112,7 @@ namespace orc {
     std::unique_ptr<ByteRleDecoder> notNullDecoder;
     uint64_t columnId;
     MemoryPool& memoryPool;
+    ReaderMetrics* metrics;
 
   public:
     ColumnReader(const Type& type, StripeStreams& stipe);
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index ea1017150..0b4969f41 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -18,6 +18,7 @@
 
 #include "Adaptor.hh"
 #include "Compression.hh"
+#include "Utils.hh"
 #include "orc/Exceptions.hh"
 #include "LzoDecompressor.hh"
 #include "lz4.h"
@@ -336,7 +337,8 @@ DIAGNOSTIC_PUSH
   public:
     DecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                         size_t bufferSize,
-                        MemoryPool& pool);
+                        MemoryPool& pool,
+                        ReaderMetrics* metrics);
     virtual ~DecompressionStream() override {}
     virtual bool Next(const void** data, int*size) override;
     virtual void BackUp(int count) override;
@@ -390,12 +392,15 @@ DIAGNOSTIC_PUSH
 
     // roughly the number of bytes returned
     off_t bytesReturned;
+
+    ReaderMetrics* metrics;
   };
 
   DecompressionStream::DecompressionStream(
       std::unique_ptr<SeekableInputStream> inStream,
       size_t bufferSize,
-      MemoryPool& _pool
+      MemoryPool& _pool,
+      ReaderMetrics* _metrics
       ) : pool(_pool),
           input(std::move(inStream)),
           outputDataBuffer(pool, bufferSize),
@@ -410,7 +415,8 @@ DIAGNOSTIC_PUSH
           inputBufferEnd(nullptr),
           headerPosition(0),
           inputBufferStartPosition(0),
-          bytesReturned(0)  {
+          bytesReturned(0),
+          metrics(_metrics)  {
   }
 
   std::string DecompressionStream::getStreamName() const {
@@ -418,6 +424,7 @@ DIAGNOSTIC_PUSH
   }
 
   void DecompressionStream::readBuffer(bool failOnEof) {
+    SCOPED_MINUS_STOPWATCH(metrics, DecompressionLatencyUs);
     int length;
     if (!input->Next(reinterpret_cast<const void**>(&inputBuffer),
                       &length)) {
@@ -463,6 +470,7 @@ DIAGNOSTIC_PUSH
   }
 
   bool DecompressionStream::Next(const void** data, int*size) {
+    SCOPED_STOPWATCH(metrics, DecompressionLatencyUs, DecompressionCall);
     // If we are starting a new header, we will have to store its positions
     // after decompressing.
     bool saveBufferPositions = false;
@@ -612,7 +620,8 @@ DIAGNOSTIC_PUSH
   public:
     ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                             size_t blockSize,
-                            MemoryPool& pool);
+                            MemoryPool& pool,
+                            ReaderMetrics* metrics);
     virtual ~ZlibDecompressionStream() override;
     virtual std::string getName() const override;
 
@@ -633,9 +642,10 @@ DIAGNOSTIC_PUSH
   ZlibDecompressionStream::ZlibDecompressionStream
                    (std::unique_ptr<SeekableInputStream> inStream,
                     size_t bufferSize,
-                    MemoryPool& _pool
+                    MemoryPool& _pool,
+                    ReaderMetrics* _metrics
                     ): DecompressionStream
-                          (std::move(inStream), bufferSize, _pool) {
+                          (std::move(inStream), bufferSize, _pool, _metrics) {
     zstream.next_in = nullptr;
     zstream.avail_in = 0;
     zstream.zalloc = nullptr;
@@ -731,7 +741,8 @@ DIAGNOSTIC_POP
   public:
     BlockDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                              size_t blockSize,
-                             MemoryPool& pool);
+                             MemoryPool& pool,
+                             ReaderMetrics* metrics);
 
     virtual ~BlockDecompressionStream() override {}
     virtual std::string getName() const override = 0;
@@ -752,9 +763,10 @@ DIAGNOSTIC_POP
   BlockDecompressionStream::BlockDecompressionStream
                    (std::unique_ptr<SeekableInputStream> inStream,
                     size_t blockSize,
-                    MemoryPool& _pool
+                    MemoryPool& _pool,
+                    ReaderMetrics* _metrics
                     ) : DecompressionStream
-                            (std::move(inStream), blockSize, _pool),
+                            (std::move(inStream), blockSize, _pool, _metrics),
                         inputDataBuffer(pool, blockSize) {
   }
 
@@ -800,11 +812,13 @@ DIAGNOSTIC_POP
   public:
     SnappyDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                               size_t blockSize,
-                              MemoryPool& _pool
+                              MemoryPool& _pool,
+                              ReaderMetrics* _metrics
                               ): BlockDecompressionStream
                                  (std::move(inStream),
                                   blockSize,
-                                  _pool) {
+                                  _pool,
+                                  _metrics) {
       // PASS
     }
 
@@ -843,11 +857,13 @@ DIAGNOSTIC_POP
   public:
     LzoDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                            size_t blockSize,
-                           MemoryPool& _pool
+                           MemoryPool& _pool,
+                           ReaderMetrics* _metrics
                            ): BlockDecompressionStream
                                   (std::move(inStream),
                                    blockSize,
-                                   _pool) {
+                                   _pool,
+                                   _metrics) {
       // PASS
     }
 
@@ -875,11 +891,13 @@ DIAGNOSTIC_POP
   public:
     Lz4DecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                            size_t blockSize,
-                           MemoryPool& _pool
+                           MemoryPool& _pool,
+                           ReaderMetrics* _metrics
                            ): BlockDecompressionStream
                               (std::move(inStream),
                                blockSize,
-                               _pool) {
+                               _pool,
+                               _metrics) {
       // PASS
     }
 
@@ -1179,10 +1197,12 @@ DIAGNOSTIC_PUSH
   public:
     ZSTDDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                             size_t blockSize,
-                            MemoryPool& _pool)
+                            MemoryPool& _pool,
+                            ReaderMetrics* _metrics)
                             : BlockDecompressionStream(std::move(inStream),
                                                        blockSize,
-                                                       _pool) {
+                                                       _pool,
+                                                       _metrics) {
       this->init();
     }
 
@@ -1292,25 +1312,26 @@ DIAGNOSTIC_PUSH
      createDecompressor(CompressionKind kind,
                         std::unique_ptr<SeekableInputStream> input,
                         uint64_t blockSize,
-                        MemoryPool& pool) {
+                        MemoryPool& pool,
+                        ReaderMetrics* metrics) {
     switch (static_cast<int64_t>(kind)) {
     case CompressionKind_NONE:
       return REDUNDANT_MOVE(input);
     case CompressionKind_ZLIB:
       return std::unique_ptr<SeekableInputStream>
-        (new ZlibDecompressionStream(std::move(input), blockSize, pool));
+        (new ZlibDecompressionStream(std::move(input), blockSize, pool, metrics));
     case CompressionKind_SNAPPY:
       return std::unique_ptr<SeekableInputStream>
-        (new SnappyDecompressionStream(std::move(input), blockSize, pool));
+        (new SnappyDecompressionStream(std::move(input), blockSize, pool, metrics));
     case CompressionKind_LZO:
       return std::unique_ptr<SeekableInputStream>
-        (new LzoDecompressionStream(std::move(input), blockSize, pool));
+        (new LzoDecompressionStream(std::move(input), blockSize, pool, metrics));
     case CompressionKind_LZ4:
       return std::unique_ptr<SeekableInputStream>
-        (new Lz4DecompressionStream(std::move(input), blockSize, pool));
+        (new Lz4DecompressionStream(std::move(input), blockSize, pool, metrics));
     case CompressionKind_ZSTD:
       return std::unique_ptr<SeekableInputStream>
-        (new ZSTDDecompressionStream(std::move(input), blockSize, pool));
+        (new ZSTDDecompressionStream(std::move(input), blockSize, pool, metrics));
     default: {
       std::ostringstream buffer;
       buffer << "Unknown compression codec " << kind;
diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh
index ff79377d8..6457ad98c 100644
--- a/c++/src/Compression.hh
+++ b/c++/src/Compression.hh
@@ -30,12 +30,14 @@ namespace orc {
    * @param input the input stream that is the underlying source
    * @param bufferSize the maximum size of the buffer
    * @param pool the memory pool
+   * @param metrics the reader metrics
    */
   std::unique_ptr<SeekableInputStream>
      createDecompressor(CompressionKind kind,
                         std::unique_ptr<SeekableInputStream> input,
                         uint64_t bufferSize,
-                        MemoryPool& pool);
+                        MemoryPool& pool,
+                        ReaderMetrics* metrics);
 
   /**
    * Create a compressor for the given compression kind.
diff --git a/c++/src/Options.hh b/c++/src/Options.hh
index d8331b3c0..36fec500d 100644
--- a/c++/src/Options.hh
+++ b/c++/src/Options.hh
@@ -42,11 +42,13 @@ namespace orc {
     std::ostream* errorStream;
     MemoryPool* memoryPool;
     std::string serializedTail;
+    ReaderMetrics* metrics;
 
     ReaderOptionsPrivate() {
       tailLocation = std::numeric_limits<uint64_t>::max();
       errorStream = &std::cerr;
       memoryPool = getDefaultPool();
+      metrics = getDefaultReaderMetrics();
     }
   };
 
@@ -87,6 +89,15 @@ namespace orc {
     return privateBits->memoryPool;
   }
 
+  ReaderOptions& ReaderOptions::setReaderMetrics(ReaderMetrics* metrics) {
+    privateBits->metrics = metrics;
+    return *this;
+  }
+
+  ReaderMetrics* ReaderOptions::getReaderMetrics() const {
+    return privateBits->metrics;
+  }
+
   ReaderOptions& ReaderOptions::setTailLocation(uint64_t offset) {
     privateBits->tailLocation = offset;
     return *this;
diff --git a/c++/src/OrcFile.cc b/c++/src/OrcFile.cc
index a0158bbad..9b7fc7b75 100644
--- a/c++/src/OrcFile.cc
+++ b/c++/src/OrcFile.cc
@@ -17,6 +17,7 @@
  */
 
 #include "Adaptor.hh"
+#include "Utils.hh"
 #include "orc/OrcFile.hh"
 #include "orc/Exceptions.hh"
 
@@ -44,10 +45,13 @@ namespace orc {
     std::string filename;
     int file;
     uint64_t totalLength;
+    ReaderMetrics* metrics;
 
   public:
-    FileInputStream(std::string _filename) {
-      filename = _filename;
+    FileInputStream(std::string _filename,
+                    ReaderMetrics* _metrics)
+                    : filename(_filename),
+                      metrics(_metrics) {
       file = open(filename.c_str(), O_BINARY | O_RDONLY);
       if (file == -1) {
         throw ParseError("Can't open " + filename);
@@ -72,6 +76,7 @@ namespace orc {
     void read(void* buf,
               uint64_t length,
               uint64_t offset) override {
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
       if (!buf) {
         throw ParseError("Buffer is null");
       }
@@ -94,20 +99,22 @@ namespace orc {
     close(file);
   }
 
-  std::unique_ptr<InputStream> readFile(const std::string& path) {
+  std::unique_ptr<InputStream> readFile(const std::string& path,
+                                        ReaderMetrics* metrics) {
 #ifdef BUILD_LIBHDFSPP
     if(strncmp (path.c_str(), "hdfs://", 7) == 0){
-      return orc::readHdfsFile(std::string(path));
+      return orc::readHdfsFile(std::string(path), metrics);
     } else {
 #endif
-      return orc::readLocalFile(std::string(path));
+      return orc::readLocalFile(std::string(path), metrics);
 #ifdef BUILD_LIBHDFSPP
       }
 #endif
   }
 
-  std::unique_ptr<InputStream> readLocalFile(const std::string& path) {
-      return std::unique_ptr<InputStream>(new FileInputStream(path));
+  std::unique_ptr<InputStream> readLocalFile(const std::string& path,
+                                             ReaderMetrics* metrics) {
+      return std::unique_ptr<InputStream>(new FileInputStream(path, metrics));
   }
 
   OutputStream::~OutputStream() {
diff --git a/c++/src/OrcHdfsFile.cc b/c++/src/OrcHdfsFile.cc
index fff610928..381e96cef 100644
--- a/c++/src/OrcHdfsFile.cc
+++ b/c++/src/OrcHdfsFile.cc
@@ -19,6 +19,7 @@
 #include "orc/OrcFile.hh"
 
 #include "Adaptor.hh"
+#include "Utils.hh"
 #include "orc/Exceptions.hh"
 
 #include <errno.h>
@@ -40,9 +41,12 @@ namespace orc {
     std::unique_ptr<hdfs::FileSystem> file_system;
     uint64_t totalLength;
     const uint64_t READ_SIZE = 1024 * 1024; //1 MB
+    ReaderMetrics* metrics;
 
   public:
-    HdfsFileInputStream(std::string _filename) {
+    HdfsFileInputStream(std::string _filename,
+                        ReaderMetrics* _metrics)
+                        : metrics(_metrics) {
       filename = _filename ;
 
       //Building a URI object from the given uri_path
@@ -137,7 +141,7 @@ namespace orc {
     void read(void* buf,
               uint64_t length,
               uint64_t offset) override {
-
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
       if (!buf) {
         throw ParseError("Buffer is null");
       }
@@ -169,7 +173,8 @@ namespace orc {
   HdfsFileInputStream::~HdfsFileInputStream() {
   }
 
-  std::unique_ptr<InputStream> readHdfsFile(const std::string& path) {
-    return std::unique_ptr<InputStream>(new HdfsFileInputStream(path));
+  std::unique_ptr<InputStream> readHdfsFile(const std::string& path,
+                                            ReaderMetrics* metrics) {
+    return std::unique_ptr<InputStream>(new HdfsFileInputStream(path, metrics));
   }
 }
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index 21f908221..dcab77cf9 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -53,15 +53,18 @@ namespace orc {
                          (std::unique_ptr<SeekableInputStream> input,
                           bool isSigned,
                           RleVersion version,
-                          MemoryPool& pool) {
+                          MemoryPool& pool,
+                          ReaderMetrics* metrics) {
     switch (static_cast<int64_t>(version)) {
     case RleVersion_1:
       // We don't have std::make_unique() yet.
       return std::unique_ptr<RleDecoder>(new RleDecoderV1(std::move(input),
-                                                          isSigned));
+                                                          isSigned,
+                                                          metrics));
     case RleVersion_2:
       return std::unique_ptr<RleDecoder>(new RleDecoderV2(std::move(input),
-                                                          isSigned, pool));
+                                                          isSigned, pool,
+                                                          metrics));
     default:
       throw NotImplementedYet("Not implemented yet");
     }
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index 6822bd812..60c82db3a 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -102,6 +102,10 @@ namespace orc {
     // must be non-inline!
     virtual ~RleDecoder();
 
+    RleDecoder(ReaderMetrics* _metrics) : metrics(_metrics) {
+      // pass
+    }
+
     /**
      * Seek to a particular spot.
      */
@@ -121,6 +125,9 @@ namespace orc {
      */
     virtual void next(int64_t* data, uint64_t numValues,
                       const char* notNull) = 0;
+
+  protected:
+    ReaderMetrics* metrics;
   };
 
   /**
@@ -148,7 +155,8 @@ namespace orc {
                       (std::unique_ptr<SeekableInputStream> input,
                        bool isSigned,
                        RleVersion version,
-                       MemoryPool& pool);
+                       MemoryPool& pool,
+                       ReaderMetrics* metrics);
 
 }  // namespace orc
 
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index fe333978d..a913656cf 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -20,6 +20,7 @@
 #include "Compression.hh"
 #include "orc/Exceptions.hh"
 #include "RLEv1.hh"
+#include "Utils.hh"
 
 #include <algorithm>
 
@@ -138,6 +139,7 @@ void RleEncoderV1::write(int64_t value) {
 }
 
 signed char RleDecoderV1::readByte() {
+  SCOPED_MINUS_STOPWATCH(metrics, DecodingLatencyUs);
   if (bufferStart == bufferEnd) {
     int bufferLength;
     const void* bufferPointer;
@@ -191,8 +193,10 @@ void RleDecoderV1::readHeader() {
 }
 
 RleDecoderV1::RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
-                           bool hasSigned)
-    : inputStream(std::move(input)),
+                           bool hasSigned,
+                           ReaderMetrics* _metrics)
+    : RleDecoder(_metrics),
+      inputStream(std::move(input)),
       isSigned(hasSigned),
       remainingValues(0),
       value(0),
@@ -232,6 +236,7 @@ void RleDecoderV1::skip(uint64_t numValues) {
 void RleDecoderV1::next(int64_t* const data,
                         const uint64_t numValues,
                         const char* const notNull) {
+  SCOPED_STOPWATCH(metrics, DecodingLatencyUs, DecodingCall);
   uint64_t position = 0;
   // skipNulls()
   if (notNull) {
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index 8e31d7087..0b1c44b59 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -50,7 +50,7 @@ private:
 class RleDecoderV1 : public RleDecoder {
 public:
     RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
-                 bool isSigned);
+                 bool isSigned, ReaderMetrics* metrics);
 
     /**
     * Seek to a particular spot.
diff --git a/c++/src/RLEv2.hh b/c++/src/RLEv2.hh
index b1e68fb12..dbaab33ad 100644
--- a/c++/src/RLEv2.hh
+++ b/c++/src/RLEv2.hh
@@ -112,7 +112,8 @@ private:
 class RleDecoderV2 : public RleDecoder {
 public:
   RleDecoderV2(std::unique_ptr<SeekableInputStream> input,
-               bool isSigned, MemoryPool& pool);
+               bool isSigned, MemoryPool& pool,
+               ReaderMetrics* metrics);
 
   /**
   * Seek to a particular spot.
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 7be36ce41..64a70361d 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -22,6 +22,7 @@
 #include "Reader.hh"
 #include "Statistics.hh"
 #include "StripeStream.hh"
+#include "Utils.hh"
 
 #include "wrap/coded-stream-wrapper.h"
 
@@ -41,6 +42,11 @@ namespace orc {
     "1.6.0", "1.6.1", "1.6.2", "1.6.3", "1.6.4", "1.6.5", "1.6.6", "1.6.7", "1.6.8",
     "1.6.9", "1.6.10", "1.6.11", "1.7.0"};
 
+  ReaderMetrics* getDefaultReaderMetrics() {
+    static ReaderMetrics internal;
+    return &internal;
+  }
+
   const RowReaderOptions::IdReadIntentMap EMPTY_IDREADINTENTMAP() {
     return {};
   }
@@ -262,6 +268,7 @@ namespace orc {
     lastStripe = 0;
     currentRowInStripe = 0;
     rowsInCurrentStripe = 0;
+    numRowGroupsInStripeRange = 0;
     uint64_t rowTotal = 0;
 
     firstRowOfStripe.resize(numberOfStripes);
@@ -279,6 +286,10 @@ namespace orc {
         if (i >= lastStripe) {
           lastStripe = i + 1;
         }
+        if (footer->rowindexstride() > 0) {
+          numRowGroupsInStripeRange +=
+            (stripeInfo.numberofrows() + footer->rowindexstride() - 1) / footer->rowindexstride();
+        }
       }
     }
     firstStripe = currentStripe;
@@ -301,7 +312,8 @@ namespace orc {
       sargsApplier.reset(new SargsApplier(*contents->schema,
                                           sargs.get(),
                                           footer->rowindexstride(),
-                                          getWriterVersionImpl(_contents.get())));
+                                          getWriterVersionImpl(_contents.get()),
+                                          contents->readerMetrics));
     }
 
     skipBloomFilters = hasBadBloomFilters();
@@ -458,7 +470,8 @@ namespace orc {
                                    pbStream.length(),
                                    *contents->pool)),
                              getCompressionSize(),
-                             *contents->pool);
+                             *contents->pool,
+                             contents->readerMetrics);
 
         if (pbStream.kind() == proto::Stream_Kind_ROW_INDEX) {
           proto::RowIndex rowIndex;
@@ -539,7 +552,8 @@ namespace orc {
                                                       stripeFooterLength,
                                                       *contents.pool)),
                          contents.blockSize,
-                         *contents.pool);
+                         *contents.pool,
+                         contents.readerMetrics);
     proto::StripeFooter result;
     if (!result.ParseFromZeroCopyStream(pbStream.get())) {
       throw ParseError(std::string("bad StripeFooter from ") +
@@ -629,7 +643,8 @@ namespace orc {
         contents->stream.get(),
         *contents->pool,
         contents->compression,
-        contents->blockSize));
+        contents->blockSize,
+        contents->readerMetrics));
   }
 
   FileVersion ReaderImpl::getFormatVersion() const {
@@ -748,7 +763,8 @@ namespace orc {
                                                 length,
                                                 *contents->pool)),
                   contents->blockSize,
-                  *(contents->pool));
+                  *(contents->pool),
+                  contents->readerMetrics);
 
         proto::RowIndex rowIndex;
         if (!rowIndex.ParseFromZeroCopyStream(pbStream.get())) {
@@ -846,7 +862,8 @@ namespace orc {
                                                           metadataSize,
                                                           *contents->pool)),
                            contents->blockSize,
-                           *contents->pool);
+                           *contents->pool,
+                           contents->readerMetrics);
       contents->metadata.reset(new proto::Metadata());
       if (!contents->metadata->ParseFromZeroCopyStream(pbStream.get())) {
         throw ParseError("Failed to parse the metadata");
@@ -1069,7 +1086,7 @@ namespace orc {
     bloomFilterIndex.clear();
 
     // evaluate file statistics if it exists
-    if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer)) {
+    if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer, numRowGroupsInStripeRange)) {
       // skip the entire file
       markEndOfFile();
       return;
@@ -1097,7 +1114,9 @@ namespace orc {
           const auto& currentStripeStats =
             contents->metadata->stripestats(static_cast<int>(currentStripe));
           // skip this stripe after stats fail to satisfy sargs
-          isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats);
+          uint64_t stripeRowGroupCount =
+            (rowsInCurrentStripe + footer->rowindexstride() - 1) / footer->rowindexstride();
+          isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats, stripeRowGroupCount);
         }
 
         if (isStripeNeeded) {
@@ -1154,6 +1173,8 @@ namespace orc {
   }
 
   bool RowReaderImpl::next(ColumnVectorBatch& data) {
+    SCOPED_STOPWATCH(
+      contents->readerMetrics, ReaderInclusiveLatencyUs, ReaderCall);
     if (currentStripe >= lastStripe) {
       data.numElements = 0;
       markEndOfFile();
@@ -1370,7 +1391,8 @@ namespace orc {
                                             const DataBuffer<char> *buffer,
                                             uint64_t footerOffset,
                                             const proto::PostScript& ps,
-                                            MemoryPool& memoryPool) {
+                                            MemoryPool& memoryPool,
+                                            ReaderMetrics* readerMetrics) {
     const char *footerPtr = buffer->data() + footerOffset;
 
     std::unique_ptr<SeekableInputStream> pbStream =
@@ -1379,7 +1401,8 @@ namespace orc {
                          (new SeekableArrayInputStream(footerPtr,
                                                        ps.footerlength())),
                          getCompressionBlockSize(ps),
-                         memoryPool);
+                         memoryPool,
+                         readerMetrics);
 
     std::unique_ptr<proto::Footer> footer =
       std::unique_ptr<proto::Footer>(new proto::Footer());
@@ -1397,6 +1420,7 @@ namespace orc {
     std::shared_ptr<FileContents> contents = std::shared_ptr<FileContents>(new FileContents());
     contents->pool = options.getMemoryPool();
     contents->errorStream = options.getErrorStream();
+    contents->readerMetrics = options.getReaderMetrics();
     std::string serializedFooter = options.getSerializedFileTail();
     uint64_t fileLength;
     uint64_t postscriptLength;
@@ -1444,7 +1468,7 @@ namespace orc {
       }
 
       contents->footer = REDUNDANT_MOVE(readFooter(stream.get(), buffer.get(),
-        footerOffset, *contents->postscript,  *contents->pool));
+        footerOffset, *contents->postscript,  *contents->pool, contents->readerMetrics));
     }
     contents->isDecimalAsLong = false;
     if (contents->postscript->version_size() == 2) {
@@ -1493,7 +1517,8 @@ namespace orc {
                                                             length,
                                                             *contents->pool)),
                              contents->blockSize,
-                             *(contents->pool));
+                             *(contents->pool),
+                             contents->readerMetrics);
 
         proto::BloomFilterIndex pbBFIndex;
         if (!pbBFIndex.ParseFromZeroCopyStream(pbStream.get())) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 395d0df19..e25efde8e 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -67,6 +67,7 @@ namespace orc {
     /// this new encoding is used.
     bool isDecimalAsLong;
     std::unique_ptr<proto::Metadata> metadata;
+    ReaderMetrics* readerMetrics;
   };
 
   proto::StripeFooter getStripeFooter(const proto::StripeInformation& info,
@@ -149,6 +150,8 @@ namespace orc {
     uint64_t processingStripe;
     uint64_t currentRowInStripe;
     uint64_t rowsInCurrentStripe;
+    // number of row groups between first stripe and last stripe
+    uint64_t numRowGroupsInStripeRange;
     proto::StripeInformation currentStripeInfo;
     proto::StripeFooter currentStripeFooter;
     std::unique_ptr<ColumnReader> reader;
@@ -336,6 +339,10 @@ namespace orc {
 
     bool hasCorrectStatistics() const override;
 
+    const ReaderMetrics* getReaderMetrics() const override {
+      return contents->readerMetrics;
+    }
+
     const proto::PostScript* getPostscript() const {return contents->postscript.get();}
 
     uint64_t getBlockSize() const {return contents->blockSize;}
@@ -357,7 +364,6 @@ namespace orc {
     std::map<uint32_t, BloomFilterIndex>
     getBloomFilters(uint32_t stripeIndex, const std::set<uint32_t>& included) const override;
   };
-
 }// namespace
 
 #endif
diff --git a/c++/src/RleDecoderV2.cc b/c++/src/RleDecoderV2.cc
index 8ab57b1f6..95078a73f 100644
--- a/c++/src/RleDecoderV2.cc
+++ b/c++/src/RleDecoderV2.cc
@@ -20,10 +20,12 @@
 #include "Compression.hh"
 #include "RLEv2.hh"
 #include "RLEV2Util.hh"
+#include "Utils.hh"
 
 namespace orc {
 
 unsigned char RleDecoderV2::readByte() {
+  SCOPED_MINUS_STOPWATCH(metrics, DecodingLatencyUs);
   if (bufferStart == bufferEnd) {
     int bufferLength;
     const void* bufferPointer;
@@ -391,8 +393,10 @@ void RleDecoderV2::plainUnpackLongs(int64_t *data, uint64_t offset, uint64_t len
 }
 
 RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input,
-                           bool _isSigned, MemoryPool& pool
-                           ): inputStream(std::move(input)),
+                           bool _isSigned, MemoryPool& pool,
+                           ReaderMetrics* _metrics
+                           ): RleDecoder(_metrics),
+                              inputStream(std::move(input)),
                               isSigned(_isSigned),
                               firstByte(0),
                               runLength(0),
@@ -432,6 +436,7 @@ void RleDecoderV2::skip(uint64_t numValues) {
 void RleDecoderV2::next(int64_t* const data,
                         const uint64_t numValues,
                         const char* const notNull) {
+  SCOPED_STOPWATCH(metrics, DecodingLatencyUs, DecodingCall);
   uint64_t nRead = 0;
 
   while (nRead < numValues) {
diff --git a/c++/src/StripeStream.cc b/c++/src/StripeStream.cc
index 6d6dda832..1c3a76cbf 100644
--- a/c++/src/StripeStream.cc
+++ b/c++/src/StripeStream.cc
@@ -112,7 +112,8 @@ namespace orc {
                                     *pool,
                                     myBlock)),
                                   reader.getCompressionSize(),
-                                  *pool);
+                                  *pool,
+                                  reader.getFileContents().readerMetrics);
       }
       offset += stream.length();
     }
@@ -123,6 +124,10 @@ namespace orc {
     return *reader.getFileContents().pool;
   }
 
+  ReaderMetrics* StripeStreamsImpl::getReaderMetrics() const {
+    return reader.getFileContents().readerMetrics;
+  }
+
   bool StripeStreamsImpl::getThrowOnHive11DecimalOverflow() const {
     return reader.getThrowOnHive11DecimalOverflow();
   }
@@ -147,7 +152,8 @@ namespace orc {
                                                           footerLength,
                                                           memory)),
                            blockSize,
-                           memory);
+                           memory,
+                           metrics);
       stripeFooter.reset(new proto::StripeFooter());
       if (!stripeFooter->ParseFromZeroCopyStream(pbStream.get())) {
         throw ParseError("Failed to parse the stripe footer");
diff --git a/c++/src/StripeStream.hh b/c++/src/StripeStream.hh
index 8d9fb0652..7365389a0 100644
--- a/c++/src/StripeStream.hh
+++ b/c++/src/StripeStream.hh
@@ -68,6 +68,8 @@ namespace orc {
 
     MemoryPool& getMemoryPool() const override;
 
+    ReaderMetrics* getReaderMetrics() const override;
+
     const Timezone& getWriterTimezone() const override;
 
     const Timezone& getReaderTimezone() const override;
@@ -135,6 +137,7 @@ namespace orc {
     CompressionKind compression;
     uint64_t blockSize;
     mutable std::unique_ptr<proto::StripeFooter> stripeFooter;
+    ReaderMetrics* metrics;
     void ensureStripeFooterLoaded() const;
   public:
 
@@ -146,7 +149,8 @@ namespace orc {
                           InputStream* _stream,
                           MemoryPool& _memory,
                           CompressionKind _compression,
-                          uint64_t _blockSize
+                          uint64_t _blockSize,
+                          ReaderMetrics* _metrics
                           ) : offset(_offset),
                               indexLength(_indexLength),
                               dataLength(_dataLength),
@@ -155,7 +159,8 @@ namespace orc {
                               stream(_stream),
                               memory(_memory),
                               compression(_compression),
-                              blockSize(_blockSize) {
+                              blockSize(_blockSize),
+                              metrics(_metrics) {
       // PASS
     }
 
diff --git a/c++/src/Utils.hh b/c++/src/Utils.hh
new file mode 100644
index 000000000..6f478f583
--- /dev/null
+++ b/c++/src/Utils.hh
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_UTILS_HH
+#define ORC_UTILS_HH
+
+#include <atomic>
+#include <chrono>
+
+namespace orc {
+
+class AutoStopwatch {
+  std::chrono::high_resolution_clock::time_point start;
+  std::atomic<uint64_t>* latencyUs;
+  std::atomic<uint64_t>* count;
+  bool minus;
+
+public:
+  AutoStopwatch(std::atomic<uint64_t>* _latencyUs,
+                std::atomic<uint64_t>* _count,
+                bool _minus = false)
+                : latencyUs(_latencyUs),
+                  count(_count),
+                  minus(_minus) {
+    if (latencyUs) {
+      start = std::chrono::high_resolution_clock::now();
+    }
+  }
+
+  ~AutoStopwatch() {
+    if (latencyUs) {
+      std::chrono::microseconds elapsedTime =
+        std::chrono::duration_cast<std::chrono::microseconds>(
+          std::chrono::high_resolution_clock::now() - start);
+      if (!minus) {
+        latencyUs->fetch_add(static_cast<uint64_t>(elapsedTime.count()));
+      } else {
+        latencyUs->fetch_sub(static_cast<uint64_t>(elapsedTime.count()));
+      }
+    }
+
+    if (count) {
+      count->fetch_add(1);
+    }
+  }
+};
+
+#define SCOPED_STOPWATCH(METRICS_PTR, LATENCY_VAR, COUNT_VAR)         \
+  AutoStopwatch measure(                                              \
+    (METRICS_PTR == nullptr ? nullptr : &METRICS_PTR->LATENCY_VAR),   \
+    (METRICS_PTR == nullptr ? nullptr : &METRICS_PTR->COUNT_VAR))
+
+#define SCOPED_MINUS_STOPWATCH(METRICS_PTR, LATENCY_VAR)              \
+  AutoStopwatch measure(                                              \
+    (METRICS_PTR == nullptr ? nullptr : &METRICS_PTR->LATENCY_VAR),   \
+    nullptr, true)
+
+}
+
+#endif
diff --git a/c++/src/sargs/SargsApplier.cc b/c++/src/sargs/SargsApplier.cc
index 42a554f5c..0fc6bad46 100644
--- a/c++/src/sargs/SargsApplier.cc
+++ b/c++/src/sargs/SargsApplier.cc
@@ -41,14 +41,15 @@ namespace orc {
   SargsApplier::SargsApplier(const Type& type,
                              const SearchArgument * searchArgument,
                              uint64_t rowIndexStride,
-                             WriterVersion writerVersion)
+                             WriterVersion writerVersion,
+                             ReaderMetrics* metrics)
                              : mType(type)
                              , mSearchArgument(searchArgument)
                              , mRowIndexStride(rowIndexStride)
                              , mWriterVersion(writerVersion)
-                             , mStats(0, 0)
                              , mHasEvaluatedFileStats(false)
-                             , mFileStatsEvalResult(true) {
+                             , mFileStatsEvalResult(true)
+                             , mMetrics(metrics) {
     const SearchArgumentImpl * sargs =
       dynamic_cast<const SearchArgumentImpl *>(mSearchArgument);
 
@@ -126,10 +127,14 @@ namespace orc {
     } while (rowGroup != 0);
 
     // update stats
-    mStats.first = std::accumulate(
-      mNextSkippedRows.cbegin(), mNextSkippedRows.cend(), mStats.first,
-      [](bool rg, uint64_t s) { return rg ? 1 : 0 + s; });
-    mStats.second += groupsInStripe;
+    uint64_t selectedRGs = std::accumulate(
+      mNextSkippedRows.cbegin(), mNextSkippedRows.cend(), 0UL,
+      [](uint64_t initVal, uint64_t rg) {
+        return rg > 0 ? initVal + 1 : initVal; });
+    if (mMetrics != nullptr) {
+      mMetrics->SelectedRowGroupCount.fetch_add(selectedRGs);
+      mMetrics->EvaluatedRowGroupCount.fetch_add(groupsInStripe);
+    }
 
     return mHasSelected;
   }
@@ -159,7 +164,8 @@ namespace orc {
   }
 
   bool SargsApplier::evaluateStripeStatistics(
-                            const proto::StripeStatistics& stripeStats) {
+                            const proto::StripeStatistics& stripeStats,
+                            uint64_t stripeRowGroupCount) {
     if (stripeStats.colstats_size() == 0) {
       return true;
     }
@@ -168,16 +174,23 @@ namespace orc {
     if (!ret) {
       // reset mNextSkippedRows when the current stripe does not satisfy the PPD
       mNextSkippedRows.clear();
+      if (mMetrics != nullptr) {
+        mMetrics->EvaluatedRowGroupCount.fetch_add(stripeRowGroupCount);
+      }
     }
     return ret;
   }
 
-  bool SargsApplier::evaluateFileStatistics(const proto::Footer& footer) {
+  bool SargsApplier::evaluateFileStatistics(const proto::Footer& footer,
+                                            uint64_t numRowGroupsInStripeRange) {
     if (!mHasEvaluatedFileStats) {
       if (footer.statistics_size() == 0) {
         mFileStatsEvalResult = true;
       } else {
         mFileStatsEvalResult = evaluateColumnStatistics(footer.statistics());
+        if (!mFileStatsEvalResult && mMetrics != nullptr) {
+          mMetrics->EvaluatedRowGroupCount.fetch_add(numRowGroupsInStripeRange);
+        }
       }
       mHasEvaluatedFileStats = true;
     }
diff --git a/c++/src/sargs/SargsApplier.hh b/c++/src/sargs/SargsApplier.hh
index d8bdf852d..1be875629 100644
--- a/c++/src/sargs/SargsApplier.hh
+++ b/c++/src/sargs/SargsApplier.hh
@@ -23,6 +23,7 @@
 #include <orc/Common.hh>
 #include "orc/BloomFilter.hh"
 #include "orc/Type.hh"
+#include "orc/Reader.hh"
 
 #include "sargs/SearchArgument.hh"
 
@@ -35,19 +36,30 @@ namespace orc {
     SargsApplier(const Type& type,
                  const SearchArgument * searchArgument,
                  uint64_t rowIndexStride,
-                 WriterVersion writerVersion);
+                 WriterVersion writerVersion,
+                 ReaderMetrics* metrics);
 
     /**
      * Evaluate search argument on file statistics
+     * If file statistics don't satisfy the sargs,
+     * the EvaluatedRowGroupCount of Reader Metrics will be updated.
+     * Otherwise, Reader Metrics will not be updated and
+     * will require further evaluation.
      * @return true if file statistics satisfy the sargs
      */
-    bool evaluateFileStatistics(const proto::Footer& footer);
+    bool evaluateFileStatistics(const proto::Footer& footer,
+                                uint64_t numRowGroupsInStripeRange);
 
     /**
      * Evaluate search argument on stripe statistics
+     * If stripe statistics don't satisfy the sargs,
+     * the EvaluatedRowGroupCount of Reader Metrics will be updated.
+     * Otherwise, Reader Metrics will not be updated and
+     * will require further evaluation.
      * @return true if stripe statistics satisfy the sargs
      */
-    bool evaluateStripeStatistics(const proto::StripeStatistics& stripeStats);
+    bool evaluateStripeStatistics(const proto::StripeStatistics& stripeStats,
+                                  uint64_t stripeRowGroupCount);
 
     /**
      * TODO: use proto::RowIndex and proto::BloomFilter to do the evaluation
@@ -90,7 +102,12 @@ namespace orc {
     }
 
     std::pair<uint64_t, uint64_t> getStats() const {
-      return mStats;
+      if (mMetrics != nullptr) {
+        return std::make_pair(mMetrics->SelectedRowGroupCount.load(),
+                              mMetrics->EvaluatedRowGroupCount.load());
+      } else {
+        return {0, 0};
+      }
     }
 
   private:
@@ -119,11 +136,12 @@ namespace orc {
     uint64_t mTotalRowsInStripe;
     bool mHasSelected;
     bool mHasSkipped;
-    // keep stats of selected RGs and evaluated RGs
-    std::pair<uint64_t, uint64_t> mStats;
     // store result of file stats evaluation
     bool mHasEvaluatedFileStats;
     bool mFileStatsEvalResult;
+    // use the SelectedRowGroupCount and EvaluatedRowGroupCount to
+    // keep stats of selected RGs and evaluated RGs
+    ReaderMetrics* mMetrics;
   };
 
 }
diff --git a/c++/test/TestAttributes.cc b/c++/test/TestAttributes.cc
index f7d0b4740..03dd892af 100644
--- a/c++/test/TestAttributes.cc
+++ b/c++/test/TestAttributes.cc
@@ -200,8 +200,11 @@ namespace orc {
       ss << "../../../examples";
     }
     ss << "/complextypes_iceberg.orc";
+    ReaderOptions readerOpts;
     std::unique_ptr<orc::Reader> reader =
-      orc::createReader(readLocalFile(ss.str().c_str()), ReaderOptions());
+      orc::createReader(readLocalFile(ss.str().c_str(),
+                                      readerOpts.getReaderMetrics()),
+                        readerOpts);
     auto rowReader = createRowReader(reader);
     auto& root = rowReader->getSelectedType();
     std::vector<uint64_t> fieldIds;
diff --git a/c++/test/TestByteRLEEncoder.cc b/c++/test/TestByteRLEEncoder.cc
index c5db5591b..539cfad15 100644
--- a/c++/test/TestByteRLEEncoder.cc
+++ b/c++/test/TestByteRLEEncoder.cc
@@ -73,7 +73,7 @@ namespace orc {
       new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
 
     std::unique_ptr<ByteRleDecoder> decoder =
-      createByteRleDecoder(std::move(inStream));
+      createByteRleDecoder(std::move(inStream), getDefaultReaderMetrics());
 
     char* decodedData = new char[numValues];
     decoder->next(decodedData, numValues, notNull);
@@ -97,7 +97,7 @@ namespace orc {
       new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
 
     std::unique_ptr<ByteRleDecoder> decoder =
-      createBooleanRleDecoder(std::move(inStream));
+      createBooleanRleDecoder(std::move(inStream), getDefaultReaderMetrics());
 
     char* decodedData = new char[numValues];
     decoder->next(decodedData, numValues, notNull);
diff --git a/c++/test/TestByteRle.cc b/c++/test/TestByteRle.cc
index 38c01d336..52b0e984d 100644
--- a/c++/test/TestByteRle.cc
+++ b/c++/test/TestByteRle.cc
@@ -34,7 +34,8 @@ TEST(ByteRle, simpleTest) {
   std::unique_ptr<ByteRleDecoder> rle =
       createByteRleDecoder(
         std::unique_ptr<SeekableInputStream>
-        (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))));
+        (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))),
+        getDefaultReaderMetrics());
   std::vector<char> data(103);
   rle->next(data.data(), data.size(), nullptr);
 
@@ -62,7 +63,8 @@ TEST(ByteRle, nullTest) {
   std::unique_ptr<ByteRleDecoder> rle =
     createByteRleDecoder(std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream(buffer,
-                                                       sizeof(buffer))));
+                                                       sizeof(buffer))),
+                         getDefaultReaderMetrics());
   rle->next(result, sizeof(result), notNull);
   for(size_t i = 0; i < sizeof(result); ++i) {
     if (i >= 10) {
@@ -78,7 +80,8 @@ TEST(ByteRle, literalCrossBuffer) {
   std::unique_ptr<ByteRleDecoder> rle =
       createByteRleDecoder(
         std::unique_ptr<SeekableInputStream>
-        (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 6)));
+        (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 6)),
+        getDefaultReaderMetrics());
   std::vector<char> data(20);
   rle->next(data.data(), data.size(), nullptr);
 
@@ -96,7 +99,8 @@ TEST(ByteRle, skipLiteralBufferUnderflowTest) {
   std::unique_ptr<ByteRleDecoder> rle =
       createByteRleDecoder(
         std::unique_ptr<SeekableInputStream>(
-          new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 4)));
+          new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 4)),
+        getDefaultReaderMetrics());
   std::vector<char> data(8);
   rle->next(data.data(), 3, nullptr);
   EXPECT_EQ(0x0, data[0]);
@@ -115,7 +119,8 @@ TEST(ByteRle, simpleRuns) {
   std::unique_ptr<ByteRleDecoder> rle =
       createByteRleDecoder(
         std::unique_ptr<SeekableInputStream>
-	(new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))));
+	      (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))),
+        getDefaultReaderMetrics());
   std::vector<char> data(16);
   for (size_t i = 0; i < 3; ++i) {
     rle->next(data.data(), data.size(), nullptr);
@@ -135,7 +140,8 @@ TEST(ByteRle, splitHeader) {
   std::unique_ptr<ByteRleDecoder> rle =
       createByteRleDecoder(
         std::unique_ptr<orc::SeekableInputStream>
-	(new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 1)));
+	      (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 1)),
+        getDefaultReaderMetrics());
   std::vector<char> data(35);
   rle->next(data.data(), data.size(), nullptr);
   for (size_t i = 0; i < 3; ++i) {
@@ -153,7 +159,8 @@ TEST(ByteRle, splitRuns) {
   std::unique_ptr<ByteRleDecoder> rle =
       createByteRleDecoder(
         std::unique_ptr<orc::SeekableInputStream>
-	(new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))));
+	      (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))),
+        getDefaultReaderMetrics());
   std::vector<char> data(5);
   for (size_t i = 0; i < 3; ++i) {
     rle->next(data.data(), data.size(), nullptr);
@@ -185,7 +192,8 @@ TEST(ByteRle, testNulls) {
   std::unique_ptr<ByteRleDecoder> rle =
       createByteRleDecoder(
         std::unique_ptr<orc::SeekableInputStream>
-	(new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 3)));
+	      (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 3)),
+        getDefaultReaderMetrics());
   std::vector<char> data(16, -1);
   std::vector<char> notNull(data.size());
   for (size_t i = 0; i < data.size(); ++i) {
@@ -220,7 +228,8 @@ TEST(ByteRle, testAllNulls) {
   std::unique_ptr<ByteRleDecoder> rle =
       createByteRleDecoder(
         std::unique_ptr<orc::SeekableInputStream>
-	(new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))));
+	      (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))),
+        getDefaultReaderMetrics());
   std::vector<char> data(16, -1);
   std::vector<char> allNull(data.size(), 0);
   std::vector<char> noNull(data.size(), 1);
@@ -350,7 +359,8 @@ TEST(ByteRle, testSkip) {
   SeekableInputStream* const stream =
     new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer));
   std::unique_ptr<ByteRleDecoder> rle =
-      createByteRleDecoder(std::unique_ptr<orc::SeekableInputStream>(stream));
+      createByteRleDecoder(std::unique_ptr<orc::SeekableInputStream>(stream),
+                           getDefaultReaderMetrics());
   std::vector<char> data(1);
   for (size_t i = 0; i < 2048; i += 10) {
     rle->next(data.data(), data.size(), nullptr);
@@ -793,7 +803,7 @@ TEST(ByteRle, testSeek) {
     positions[i].push_back(rleLocs[i]);
   }
   std::unique_ptr<ByteRleDecoder> rle =
-      createByteRleDecoder(std::move(stream));
+      createByteRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(1);
   for (size_t i = 0; i < 2048; ++i) {
     rle->next(data.data(), 1, nullptr);
@@ -818,7 +828,7 @@ TEST(BooleanRle, simpleTest) {
   std::unique_ptr<SeekableInputStream> stream
     (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer)));
   std::unique_ptr<ByteRleDecoder> rle =
-      createBooleanRleDecoder(std::move(stream));
+      createBooleanRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(50);
   for (size_t i = 0; i < 16; ++i) {
     rle->next(data.data(), data.size(), nullptr);
@@ -845,7 +855,7 @@ TEST(BooleanRle, runsTest) {
   std::unique_ptr<SeekableInputStream> stream
     (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer)));
   std::unique_ptr<ByteRleDecoder> rle =
-      createBooleanRleDecoder(std::move(stream));
+      createBooleanRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(72);
   rle->next(data.data(), data.size(), nullptr);
   for (size_t i = 0; i < data.size(); ++i) {
@@ -870,7 +880,7 @@ TEST(BooleanRle, runsTestWithNull) {
   std::unique_ptr<SeekableInputStream> stream
     (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer)));
   std::unique_ptr<ByteRleDecoder> rle =
-      createBooleanRleDecoder(std::move(stream));
+      createBooleanRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(72);
   std::vector<char> notNull(data.size(), 1);
   rle->next(data.data(), data.size(), notNull.data());
@@ -983,7 +993,7 @@ TEST(BooleanRle, skipTest) {
   std::unique_ptr<SeekableInputStream> stream
     (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer)));
   std::unique_ptr<ByteRleDecoder> rle =
-      createBooleanRleDecoder(std::move(stream));
+      createBooleanRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(1);
   for (size_t i = 0; i < 16384; i += 5) {
     rle->next(data.data(), data.size(), nullptr);
@@ -1090,7 +1100,7 @@ TEST(BooleanRle, skipTestWithNulls) {
   std::unique_ptr<SeekableInputStream> stream
     (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer)));
   std::unique_ptr<ByteRleDecoder> rle =
-      createBooleanRleDecoder(std::move(stream));
+      createBooleanRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(3);
   std::vector<char> someNull(data.size(), 0);
   someNull[1] = 1;
@@ -1208,7 +1218,7 @@ TEST(BooleanRle, seekTest) {
   std::unique_ptr<SeekableInputStream> stream
     (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer)));
   std::unique_ptr<ByteRleDecoder> rle =
-      createBooleanRleDecoder(std::move(stream));
+      createBooleanRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(16384);
   rle->next(data.data(), data.size(), nullptr);
   for (size_t i = 0; i < data.size(); ++i) {
@@ -1338,7 +1348,7 @@ TEST(BooleanRle, seekTestWithNulls) {
   std::unique_ptr<SeekableInputStream> stream
     (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer)));
   std::unique_ptr<ByteRleDecoder> rle =
-      createBooleanRleDecoder(std::move(stream));
+      createBooleanRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(16384);
   std::vector<char> allNull(data.size(), 0);
   std::vector<char> noNull(data.size(), 1);
@@ -1406,7 +1416,7 @@ TEST(BooleanRle, seekBoolAndByteRLE) {
   std::unique_ptr<SeekableInputStream> stream
     (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer)));
   std::unique_ptr<ByteRleDecoder> rle =
-    createBooleanRleDecoder(std::move(stream));
+    createBooleanRleDecoder(std::move(stream), getDefaultReaderMetrics());
   std::vector<char> data(sizeof(num) / sizeof(char));
   rle->next(data.data(), data.size(), nullptr);
   for (size_t i = 0; i < data.size(); ++i) {
@@ -1450,7 +1460,9 @@ TEST(BooleanRle, seekBoolAndByteRLE) {
                            new SeekableArrayInputStream(memStream.getData(),
                                                         memStream.getLength())),
                          blockSize,
-                         *getDefaultPool()));
+                         *getDefaultPool(),
+                         getDefaultReaderMetrics()),
+      getDefaultReaderMetrics());
 
     // before fix of ORC-470, skip all remaining boolean values will get an
     // exception since BooleanRLEDecoder still tries to read one last byte from
@@ -1487,7 +1499,8 @@ TEST(BooleanRle, seekBoolAndByteRLE) {
             new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
 
     std::unique_ptr<ByteRleDecoder> decoder =
-            createBooleanRleDecoder(std::move(inStream));
+            createBooleanRleDecoder(std::move(inStream),
+                                    getDefaultReaderMetrics());
 
     char* decodedData = new char[numValues];
     decoder->next(decodedData, 9, nullptr);
diff --git a/c++/test/TestColumnReader.cc b/c++/test/TestColumnReader.cc
index bc0ecb8eb..37d5c3b55 100644
--- a/c++/test/TestColumnReader.cc
+++ b/c++/test/TestColumnReader.cc
@@ -71,6 +71,10 @@ namespace orc {
       return *getDefaultPool();
     }
 
+    ReaderMetrics* getReaderMetrics() const {
+      return getDefaultReaderMetrics();
+    }
+
     const Timezone &getWriterTimezone() const override {
       return getTimezoneByName("America/Los_Angeles");
     }
diff --git a/c++/test/TestCompression.cc b/c++/test/TestCompression.cc
index e6fc997c3..17d569f2c 100644
--- a/c++/test/TestCompression.cc
+++ b/c++/test/TestCompression.cc
@@ -54,7 +54,8 @@ namespace orc {
       new SeekableArrayInputStream(memStream.getData(), memStream.getLength()));
 
     std::unique_ptr<SeekableInputStream> decompressStream =
-      createDecompressor(kind, std::move(inputStream), 1024, pool);
+      createDecompressor(kind, std::move(inputStream), 1024, pool,
+                         getDefaultReaderMetrics());
 
     const char * decompressedBuffer;
     int decompressedSize;
@@ -295,7 +296,8 @@ namespace orc {
       createDecompressor(kind,
                          std::move(inputStream),
                          1024,
-                         *pool);
+                         *pool,
+                         getDefaultReaderMetrics());
 
     proto::PostScript ps2;
     ps2.ParseFromZeroCopyStream(decompressStream.get());
@@ -419,7 +421,8 @@ namespace orc {
       createDecompressor(kind,
                          std::move(inputStream),
                          blockSize,
-                         *pool);
+                         *pool,
+                         getDefaultReaderMetrics());
 
     // prepare positions to seek to
     EXPECT_EQ(rowIndexEntry1.positions_size(), rowIndexEntry2.positions_size());
diff --git a/c++/test/TestDecompression.cc b/c++/test/TestDecompression.cc
index 40d783e0e..d44345327 100644
--- a/c++/test/TestDecompression.cc
+++ b/c++/test/TestDecompression.cc
@@ -210,7 +210,8 @@ namespace orc {
 
   TEST_F(TestDecompression, testFileBackup) {
     SCOPED_TRACE("testFileBackup");
-    std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
+    std::unique_ptr<InputStream> file =
+      readLocalFile(simpleFile, getDefaultReaderMetrics());
     SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
     const void *ptr;
     int len;
@@ -241,7 +242,8 @@ namespace orc {
 
   TEST_F(TestDecompression, testFileSkip) {
     SCOPED_TRACE("testFileSkip");
-    std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
+    std::unique_ptr<InputStream> file =
+      readLocalFile(simpleFile, getDefaultReaderMetrics());
     SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
     const void *ptr;
     int len;
@@ -261,7 +263,8 @@ namespace orc {
 
   TEST_F(TestDecompression, testFileCombo) {
     SCOPED_TRACE("testFileCombo");
-    std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
+    std::unique_ptr<InputStream> file =
+      readLocalFile(simpleFile, getDefaultReaderMetrics());
     SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
     const void *ptr;
     int len;
@@ -281,7 +284,8 @@ namespace orc {
 
   TEST_F(TestDecompression, testFileSeek) {
     SCOPED_TRACE("testFileSeek");
-    std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
+    std::unique_ptr<InputStream> file =
+      readLocalFile(simpleFile, getDefaultReaderMetrics());
     SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
     const void *ptr;
     int len;
@@ -324,7 +328,8 @@ namespace orc {
                          (new SeekableArrayInputStream(bytes.data(),
                                                        bytes.size())),
                          32768,
-                         *getDefaultPool());
+                         *getDefaultPool(),
+                         getDefaultReaderMetrics());
     const void *ptr;
     int length;
     result->Next(&ptr, &length);
@@ -339,7 +344,7 @@ namespace orc {
       createDecompressor(CompressionKind_LZO,
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream(buffer, 0)),
-                         32768, *getDefaultPool());
+                         32768, *getDefaultPool(), getDefaultReaderMetrics());
     EXPECT_EQ("lzo(SeekableArrayInputStream 0 of 0)", result->getName());
     const void *ptr;
     int length;
@@ -358,7 +363,8 @@ namespace orc {
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream(buffer,
                                                        ARRAY_SIZE(buffer))),
-                         128*1024, *getDefaultPool());
+                         128*1024, *getDefaultPool(),
+                         getDefaultReaderMetrics());
     const void *ptr;
     int length;
     ASSERT_EQ(true, result->Next(&ptr, &length));
@@ -401,7 +407,8 @@ namespace orc {
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream(buffer,
                                                        ARRAY_SIZE(buffer))),
-                         128*1024, *getDefaultPool());
+                         128*1024, *getDefaultPool(),
+                         getDefaultReaderMetrics());
     const void *ptr;
     int length;
     ASSERT_EQ(true, result->Next(&ptr, &length));
@@ -418,7 +425,7 @@ namespace orc {
       createDecompressor(CompressionKind_LZ4,
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream(buffer, 0)),
-                         32768, *getDefaultPool());
+                         32768, *getDefaultPool(), getDefaultReaderMetrics());
     EXPECT_EQ("lz4(SeekableArrayInputStream 0 of 0)", result->getName());
     const void *ptr;
     int length;
@@ -436,7 +443,8 @@ namespace orc {
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream(buffer,
                                                        ARRAY_SIZE(buffer))),
-                         128*1024, *getDefaultPool());
+                         128*1024, *getDefaultPool(),
+                         getDefaultReaderMetrics());
     const void *ptr;
     int length;
     ASSERT_EQ(true, result->Next(&ptr, &length));
@@ -472,7 +480,8 @@ namespace orc {
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream(buffer,
                                                        ARRAY_SIZE(buffer))),
-                         128*1024, *getDefaultPool());
+                         128*1024, *getDefaultPool(),
+                         getDefaultReaderMetrics());
     const void *ptr;
     int length;
     ASSERT_EQ(true, result->Next(&ptr, &length));
@@ -490,7 +499,8 @@ namespace orc {
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream
                           (buffer, ARRAY_SIZE(buffer))),
-                         32768, *getDefaultPool());
+                         32768, *getDefaultPool(),
+                         getDefaultReaderMetrics());
     EXPECT_EQ("zlib(SeekableArrayInputStream 0 of 8)", result->getName());
     const void *ptr;
     int length;
@@ -521,7 +531,7 @@ namespace orc {
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream
                           (buffer, ARRAY_SIZE(buffer), 5)),
-                         5, *getDefaultPool());
+                         5, *getDefaultPool(), getDefaultReaderMetrics());
     EXPECT_EQ("zlib(SeekableArrayInputStream 0 of 23)", result->getName());
     const void *ptr;
     int length;
@@ -562,7 +572,7 @@ namespace orc {
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream
                           (buffer, ARRAY_SIZE(buffer))), 1000,
-			 *getDefaultPool());
+			 *getDefaultPool(), getDefaultReaderMetrics());
     const void *ptr;
     int length;
     ASSERT_EQ(true, result->Next(&ptr, &length));
@@ -585,7 +595,8 @@ namespace orc {
                          (new SeekableArrayInputStream
                           (buffer, ARRAY_SIZE(buffer), 3)),
                          1000,
-                         *getDefaultPool());
+                         *getDefaultPool(),
+                         getDefaultReaderMetrics());
     const void *ptr;
     int length;
     ASSERT_THROW(result->BackUp(20), std::logic_error);
@@ -623,7 +634,7 @@ namespace orc {
                          std::unique_ptr<SeekableInputStream>
                          (new SeekableArrayInputStream
                           (buffer, ARRAY_SIZE(buffer), 5)),
-                         5, *getDefaultPool());
+                         5, *getDefaultPool(), getDefaultReaderMetrics());
     const void *ptr;
     int length;
     ASSERT_EQ(true, result->Next(&ptr, &length));
@@ -709,7 +720,8 @@ namespace orc {
                                          compressBuffer.getBufferSize(),
                                          blockSize)),
          buf.size(),
-         *getDefaultPool());
+         *getDefaultPool(),
+         getDefaultReaderMetrics());
     const void *data;
     int length;
     ASSERT_TRUE(result->Next(&data, &length));
@@ -750,7 +762,8 @@ namespace orc {
          std::unique_ptr<SeekableInputStream>
          (new SeekableArrayInputStream(input.data(), input.size(), blockSize)),
          buf.size(),
-         *getDefaultPool());
+         *getDefaultPool(),
+         getDefaultReaderMetrics());
     for (int i=0; i < 4; ++i) {
       const void *data;
       int length;
@@ -784,7 +797,8 @@ namespace orc {
                                          compressBuffer.getBufferSize(),
                                          blockSize)),
          buf.size(),
-         *getDefaultPool());
+         *getDefaultPool(),
+         getDefaultReaderMetrics());
     const void *data;
     int length;
     // skip 1/2; in 2 jumps
@@ -815,7 +829,8 @@ namespace orc {
     std::unique_ptr<SeekableInputStream> input(
         new SeekableArrayInputStream(buf.data(), buf.size(), blockSize));
     std::unique_ptr<SeekableInputStream> stream = createDecompressor(
-        CompressionKind_SNAPPY, std::move(input), chunkSize, *getDefaultPool());
+        CompressionKind_SNAPPY, std::move(input), chunkSize, *getDefaultPool(),
+        getDefaultReaderMetrics());
 
     const void *data;
     int len;
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 237deab05..4e04280ab 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -119,8 +119,11 @@ namespace orc {
     // Read a file with bloom filters written by CPP writer in version 1.6.11.
     ss << "/" << fileName;
     ReaderOptions readerOpts;
+    readerOpts.setReaderMetrics(nullptr);
     std::unique_ptr<Reader> reader =
-      createReader(readLocalFile(ss.str().c_str()), readerOpts);
+      createReader(readLocalFile(ss.str().c_str(),
+                                 readerOpts.getReaderMetrics()),
+                   readerOpts);
     EXPECT_EQ(WriterId::ORC_CPP_WRITER, reader->getWriterId());
     EXPECT_EQ(softwareVersion, reader->getSoftwareVersion());
 
@@ -573,6 +576,7 @@ namespace orc {
         new MemoryInputStream(memStream.getData(), memStream.getLength()));
     ReaderOptions readerOptions;
     readerOptions.setMemoryPool(*pool);
+    readerOptions.setReaderMetrics(nullptr);
     return createReader(std::move(inStream), readerOptions);
   }
 
diff --git a/c++/test/TestRleDecoder.cc b/c++/test/TestRleDecoder.cc
index 1b4ca4e89..04fb74700 100644
--- a/c++/test/TestRleDecoder.cc
+++ b/c++/test/TestRleDecoder.cc
@@ -37,7 +37,8 @@ std::vector<int64_t> decodeRLEv2(const unsigned char *bytes,
   std::unique_ptr<RleDecoder> rle = createRleDecoder(
       std::unique_ptr<SeekableInputStream>(
             new SeekableArrayInputStream(bytes,l, blockSize)),
-        isSigned,RleVersion_2, *getDefaultPool());
+        isSigned, RleVersion_2, *getDefaultPool(),
+        getDefaultReaderMetrics());
   std::vector<int64_t> results;
   for (size_t i = 0; i < count; i+=n) {
     size_t remaining = count - i;
@@ -180,7 +181,8 @@ TEST(RLEv2, delta0Width) {
     createRleDecoder(std::unique_ptr<SeekableInputStream>
                      (new SeekableArrayInputStream
                       (buffer, ARRAY_SIZE(buffer))),
-                     false, RleVersion_2, *getDefaultPool());
+                     false, RleVersion_2, *getDefaultPool(),
+                     getDefaultReaderMetrics());
   int64_t values[6];
   decoder->next(values, 6, 0);
   EXPECT_EQ(0, values[0]);
@@ -269,7 +271,8 @@ TEST(RLEv2, 0to2Repeat1Direct) {
       createRleDecoder(std::unique_ptr<SeekableInputStream>
 		       (new SeekableArrayInputStream(buffer,
 						     ARRAY_SIZE(buffer))),
-		       true, RleVersion_2, *getDefaultPool());
+		       true, RleVersion_2, *getDefaultPool(),
+           getDefaultReaderMetrics());
   std::vector<int64_t> data(3);
   rle->next(data.data(), 3, nullptr);
 
@@ -659,7 +662,8 @@ TEST(RLEv2, largeNegativesDirect) {
       createRleDecoder(
           std::unique_ptr<SeekableInputStream>(
              new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer))),
-          true, RleVersion_2, *getDefaultPool());
+          true, RleVersion_2, *getDefaultPool(),
+          getDefaultReaderMetrics());
   std::vector<int64_t> data(5);
   rle->next(data.data(), 5, nullptr);
 
@@ -772,7 +776,8 @@ TEST(RLEv2, basicDirectSeek) {
   std::unique_ptr<RleDecoder> rle =
       createRleDecoder(std::unique_ptr<SeekableInputStream>
                        (new SeekableArrayInputStream(bytes,l)), true,
-                       RleVersion_2, *getDefaultPool());
+                       RleVersion_2, *getDefaultPool(),
+                       getDefaultReaderMetrics());
   std::list<uint64_t> position;
   position.push_back(7); // byte position; skip first 20 [0 to 19]
   position.push_back(13); // value position; skip 13 more [20 to 32]
@@ -853,7 +858,8 @@ TEST(RLEv2, bitsLeftByPreviousStream) {
   std::unique_ptr<RleDecoder> rle =
       createRleDecoder(std::unique_ptr<SeekableInputStream>
                        (new SeekableArrayInputStream(bytes,l)), true,
-                       RleVersion_2, *getDefaultPool());
+                       RleVersion_2, *getDefaultPool(),
+                       getDefaultReaderMetrics());
 
   std::vector<int64_t> data(N);
   rle->next(data.data(), N, nullptr);
@@ -870,7 +876,8 @@ TEST(RLEv1, simpleTest) {
       createRleDecoder(std::unique_ptr<SeekableInputStream>
 		       (new SeekableArrayInputStream(buffer,
 						     ARRAY_SIZE(buffer))),
-		       false, RleVersion_1, *getDefaultPool());
+		       false, RleVersion_1, *getDefaultPool(),
+           getDefaultReaderMetrics());
   std::vector<int64_t> data(105);
   rle->next(data.data(), 105, nullptr);
 
@@ -890,7 +897,8 @@ TEST(RLEv1, signedNullLiteralTest) {
       createRleDecoder(std::unique_ptr<SeekableInputStream>
 		       (new SeekableArrayInputStream(buffer,
 						     ARRAY_SIZE(buffer))),
-		       true, RleVersion_1, *getDefaultPool());
+		       true, RleVersion_1, *getDefaultPool(),
+           getDefaultReaderMetrics());
   std::vector<int64_t> data(8);
   std::vector<char> notNull(8, 1);
   rle->next(data.data(), 8, notNull.data());
@@ -907,7 +915,8 @@ TEST(RLEv1, splitHeader) {
       createRleDecoder(
           std::unique_ptr<SeekableInputStream>
           (new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer), 4)),
-          false, RleVersion_1, *getDefaultPool());
+          false, RleVersion_1, *getDefaultPool(),
+          getDefaultReaderMetrics());
   std::vector<int64_t> data(200);
   rle->next(data.data(), 3, nullptr);
 
@@ -923,7 +932,8 @@ TEST(RLEv1, splitRuns) {
     new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer));
   std::unique_ptr<RleDecoder> rle =
       createRleDecoder(std::unique_ptr<SeekableInputStream>(stream),
-                       false, RleVersion_1, *getDefaultPool());
+                       false, RleVersion_1, *getDefaultPool(),
+                       getDefaultReaderMetrics());
   std::vector<int64_t> data(200);
   for (size_t i = 0; i < 42; ++i) {
     rle->next(data.data(), 3, nullptr);
@@ -950,7 +960,8 @@ TEST(RLEv1, testSigned) {
     new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer));
   std::unique_ptr<RleDecoder> rle =
       createRleDecoder(std::unique_ptr<SeekableInputStream>(stream),
-                       true, RleVersion_1, *getDefaultPool());
+                       true, RleVersion_1, *getDefaultPool(),
+                       getDefaultReaderMetrics());
   std::vector<int64_t> data(100);
   rle->next(data.data(), data.size(), nullptr);
   for (size_t i = 0; i < data.size(); ++i) {
@@ -969,7 +980,8 @@ TEST(RLEv1, testNull) {
     new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer));
   std::unique_ptr<RleDecoder> rle =
       createRleDecoder(std::unique_ptr<SeekableInputStream>(stream),
-                       true, RleVersion_1, *getDefaultPool());
+                       true, RleVersion_1, *getDefaultPool(),
+                       getDefaultReaderMetrics());
   std::vector<int64_t> data(24);
   std::vector<char> notNull(24);
   for (size_t i = 0; i < notNull.size(); ++i) {
@@ -999,7 +1011,8 @@ TEST(RLEv1, testAllNulls) {
     new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer));
   std::unique_ptr<RleDecoder> rle =
       createRleDecoder(std::unique_ptr<SeekableInputStream>(stream),
-                       false, RleVersion_1, *getDefaultPool());
+                       false, RleVersion_1, *getDefaultPool(),
+                       getDefaultReaderMetrics());
   std::vector<int64_t> data(16, -1);
   std::vector<char> allNull(16, 0);
   std::vector<char> noNull(16, 1);
@@ -1234,7 +1247,8 @@ TEST(RLEv1, skipTest) {
     new SeekableArrayInputStream(buffer, ARRAY_SIZE(buffer));
   std::unique_ptr<RleDecoder> rle =
       createRleDecoder(std::unique_ptr<SeekableInputStream>(stream),
-                       true, RleVersion_1, *getDefaultPool());
+                       true, RleVersion_1, *getDefaultPool(),
+                       getDefaultReaderMetrics());
   std::vector<int64_t> data(1);
   for (size_t i = 0; i < 2048; i += 10) {
     rle->next(data.data(), 1, nullptr);
@@ -2941,7 +2955,8 @@ TEST(RLEv1, seekTest) {
   }
   std::unique_ptr<RleDecoder> rle =
       createRleDecoder(std::unique_ptr<SeekableInputStream>(stream),
-                       true, RleVersion_1, *getDefaultPool());
+                       true, RleVersion_1, *getDefaultPool(),
+                       getDefaultReaderMetrics());
   std::vector<int64_t> data(2048);
   rle->next(data.data(), data.size(), nullptr);
   for (size_t i = 0; i < data.size(); ++i) {
@@ -2977,7 +2992,8 @@ TEST(RLEv1, testLeadingNulls) {
       createRleDecoder(std::unique_ptr<SeekableInputStream>
 		       (new SeekableArrayInputStream(buffer,
 						     ARRAY_SIZE(buffer))),
-		       false, RleVersion_1, *getDefaultPool());
+		       false, RleVersion_1, *getDefaultPool(),
+           getDefaultReaderMetrics());
   std::vector<int64_t> data(10);
   const char isNull[] = {0x00, 0x00, 0x00, 0x00, 0x00,
                          0x01, 0x01, 0x01, 0x01, 0x01};
diff --git a/c++/test/TestRleEncoder.cc b/c++/test/TestRleEncoder.cc
index f54e072ef..73be2f46a 100644
--- a/c++/test/TestRleEncoder.cc
+++ b/c++/test/TestRleEncoder.cc
@@ -103,7 +103,8 @@ namespace orc {
             std::unique_ptr<SeekableArrayInputStream>(new SeekableArrayInputStream(
                     memStream.getData(),
                     memStream.getLength())),
-            isSinged, version, *getDefaultPool());
+            isSinged, version, *getDefaultPool(),
+            getDefaultReaderMetrics());
 
     int64_t* decodedData = new int64_t[numValues];
     decoder->next(decodedData, numValues, notNull);
diff --git a/c++/test/TestSargsApplier.cc b/c++/test/TestSargsApplier.cc
index 74fcae297..ed658e8e0 100644
--- a/c++/test/TestSargsApplier.cc
+++ b/c++/test/TestSargsApplier.cc
@@ -110,7 +110,9 @@ namespace orc {
     rowIndexes[2] = rowIndex2;
 
     // evaluate row group index
-    SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
+    ReaderMetrics metrics;
+    SargsApplier applier(
+      *type, sarg.get(), 1000, WriterVersion_ORC_135, &metrics);
     EXPECT_TRUE(applier.pickRowGroups(4000, rowIndexes, {}));
     const auto& nextSkippedRows = applier.getNextSkippedRows();
     EXPECT_EQ(4, nextSkippedRows.size());
@@ -118,6 +120,8 @@ namespace orc {
     EXPECT_EQ(0, nextSkippedRows[1]);
     EXPECT_EQ(0, nextSkippedRows[2]);
     EXPECT_EQ(4000, nextSkippedRows[3]);
+    EXPECT_EQ(metrics.SelectedRowGroupCount.load(), 1);
+    EXPECT_EQ(metrics.EvaluatedRowGroupCount.load(), 4);
   }
 
   TEST(TestSargsApplier, testStripeAndFileStats) {
@@ -143,8 +147,12 @@ namespace orc {
       *stripeStats.add_colstats() = structStatistics;
       *stripeStats.add_colstats() = createIntStats(0L, 10L);
       *stripeStats.add_colstats() = createIntStats(0L, 50L);
-      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
-      EXPECT_FALSE(applier.evaluateStripeStatistics(stripeStats));
+      ReaderMetrics metrics;
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135,
+                           &metrics);
+      EXPECT_FALSE(applier.evaluateStripeStatistics(stripeStats, 1));
+      EXPECT_EQ(metrics.SelectedRowGroupCount.load(), 0);
+      EXPECT_EQ(metrics.EvaluatedRowGroupCount.load(), 1);
     }
     // Test stripe stats 0 <= x <= 50 and 0 <= y <= 50
     {
@@ -154,8 +162,12 @@ namespace orc {
       *stripeStats.add_colstats() = structStatistics;
       *stripeStats.add_colstats() = createIntStats(0L, 50L);
       *stripeStats.add_colstats() = createIntStats(0L, 50L);
-      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
-      EXPECT_TRUE(applier.evaluateStripeStatistics(stripeStats));
+      ReaderMetrics metrics;
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135,
+                           &metrics);
+      EXPECT_TRUE(applier.evaluateStripeStatistics(stripeStats, 1));
+      EXPECT_EQ(metrics.SelectedRowGroupCount.load(), 0);
+      EXPECT_EQ(metrics.EvaluatedRowGroupCount.load(), 0);
     }
     // Test file stats 0 <= x <= 10 and 0 <= y <= 50
     {
@@ -165,8 +177,12 @@ namespace orc {
       *footer.add_statistics() = structStatistics;
       *footer.add_statistics() = createIntStats(0L, 10L);
       *footer.add_statistics() = createIntStats(0L, 50L);
-      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
-      EXPECT_FALSE(applier.evaluateFileStatistics(footer));
+      ReaderMetrics metrics;
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135,
+                           &metrics);
+      EXPECT_FALSE(applier.evaluateFileStatistics(footer, 1));
+      EXPECT_EQ(metrics.SelectedRowGroupCount.load(), 0);
+      EXPECT_EQ(metrics.EvaluatedRowGroupCount.load(), 1);
     }
     // Test file stats 0 <= x <= 50 and 0 <= y <= 30
     {
@@ -176,8 +192,12 @@ namespace orc {
       *footer.add_statistics() = structStatistics;
       *footer.add_statistics() = createIntStats(0L, 50L);
       *footer.add_statistics() = createIntStats(0L, 30L);
-      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
-      EXPECT_FALSE(applier.evaluateFileStatistics(footer));
+      ReaderMetrics metrics;
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135,
+                           &metrics);
+      EXPECT_FALSE(applier.evaluateFileStatistics(footer, 1));
+      EXPECT_EQ(metrics.SelectedRowGroupCount.load(), 0);
+      EXPECT_EQ(metrics.EvaluatedRowGroupCount.load(), 1);
     }
     // Test file stats 0 <= x <= 50 and 0 <= y <= 50
     {
@@ -187,8 +207,12 @@ namespace orc {
       *footer.add_statistics() = structStatistics;
       *footer.add_statistics() = createIntStats(0L, 50L);
       *footer.add_statistics() = createIntStats(0L, 50L);
-      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135);
-      EXPECT_TRUE(applier.evaluateFileStatistics(footer));
+      ReaderMetrics metrics;
+      SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135,
+                           &metrics);
+      EXPECT_TRUE(applier.evaluateFileStatistics(footer, 1));
+      EXPECT_EQ(metrics.SelectedRowGroupCount.load(), 0);
+      EXPECT_EQ(metrics.EvaluatedRowGroupCount.load(), 0);
     }
   }
 }  // namespace orc
diff --git a/c++/test/TestStripeIndexStatistics.cc b/c++/test/TestStripeIndexStatistics.cc
index e6607d4ec..5c8beaff5 100644
--- a/c++/test/TestStripeIndexStatistics.cc
+++ b/c++/test/TestStripeIndexStatistics.cc
@@ -35,9 +35,11 @@ namespace orc {
       ss << "../../../examples";
     }
     ss << "/orc_index_int_string.orc";
+    orc::ReaderOptions readerOpts;
     std::unique_ptr<orc::Reader> reader =
-      createReader(readLocalFile(ss.str().c_str()), ReaderOptions());
-
+      createReader(readLocalFile(ss.str().c_str(),
+                                 readerOpts.getReaderMetrics()),
+                   readerOpts);
     std::unique_ptr<orc::StripeStatistics> stripeStats = reader->getStripeStatistics(0);
     EXPECT_EQ(3, stripeStats->getNumberOfRowIndexStats(0));
     EXPECT_EQ(3, stripeStats->getNumberOfRowIndexStats(1));
diff --git a/c++/test/TestTimestampStatistics.cc b/c++/test/TestTimestampStatistics.cc
index ac9744363..743e8fea8 100644
--- a/c++/test/TestTimestampStatistics.cc
+++ b/c++/test/TestTimestampStatistics.cc
@@ -40,8 +40,11 @@ namespace orc {
       ss << "../../../examples";
     }
     ss << "/TestOrcFile.testTimestamp.orc";
+    orc::ReaderOptions readerOpts;
     std::unique_ptr<orc::Reader> reader =
-      createReader(readLocalFile(ss.str().c_str()), ReaderOptions());
+      createReader(readLocalFile(ss.str().c_str(),
+                                 readerOpts.getReaderMetrics()),
+                   readerOpts);
 
     std::unique_ptr<orc::ColumnStatistics> footerStats = reader->getColumnStatistics(0);
     const orc::TimestampColumnStatistics* footerColStats =
diff --git a/tools/src/FileContents.cc b/tools/src/FileContents.cc
index 6a225a864..1229d9951 100644
--- a/tools/src/FileContents.cc
+++ b/tools/src/FileContents.cc
@@ -26,7 +26,9 @@ void printContents(const char* filename, const orc::RowReaderOptions& rowReaderO
   orc::ReaderOptions readerOpts;
   std::unique_ptr<orc::Reader> reader;
   std::unique_ptr<orc::RowReader> rowReader;
-  reader = orc::createReader(orc::readFile(std::string(filename)), readerOpts);
+  reader = orc::createReader(orc::readFile(std::string(filename),
+                                           readerOpts.getReaderMetrics()),
+                             readerOpts);
   rowReader = reader->createRowReader(rowReaderOpts);
 
   std::unique_ptr<orc::ColumnVectorBatch> batch = rowReader->createRowBatch(1000);
@@ -49,7 +51,8 @@ void printContents(const char* filename, const orc::RowReaderOptions& rowReaderO
 int main(int argc, char* argv[]) {
   uint64_t batchSize; // not used
   orc::RowReaderOptions rowReaderOptions;
-  bool success = parseOptions(&argc, &argv, &batchSize, &rowReaderOptions);
+  bool showMetrics = false;
+  bool success = parseOptions(&argc, &argv, &batchSize, &rowReaderOptions, &showMetrics);
 
   if (argc < 1 || !success) {
     std::cerr << "Usage: orc-contents [options] <filename>...\n";
diff --git a/tools/src/FileMemory.cc b/tools/src/FileMemory.cc
index af6ce0d14..f38c904de 100644
--- a/tools/src/FileMemory.cc
+++ b/tools/src/FileMemory.cc
@@ -64,7 +64,9 @@ void processFile(const char* filename,
   readerOpts.setMemoryPool(*(pool.get()));
 
   std::unique_ptr<orc::Reader> reader =
-                  orc::createReader(orc::readFile(std::string(filename)), readerOpts);
+                  orc::createReader(orc::readFile(std::string(filename),
+                                                  readerOpts.getReaderMetrics()),
+                                    readerOpts);
   std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader(rowReaderOpts);
 
   std::unique_ptr<orc::ColumnVectorBatch> batch =
@@ -99,7 +101,8 @@ void processFile(const char* filename,
 int main(int argc, char* argv[]) {
   uint64_t batchSize = 1000;
   orc::RowReaderOptions rowReaderOptions;
-  bool success = parseOptions(&argc, &argv, &batchSize, &rowReaderOptions);
+  bool showMetrics = false;
+  bool success = parseOptions(&argc, &argv, &batchSize, &rowReaderOptions, &showMetrics);
   if (argc < 1 || !success) {
     std::cerr << "Usage: orc-memory [options] <filename>...\n";
     printOptions(std::cerr);
diff --git a/tools/src/FileMetadata.cc b/tools/src/FileMetadata.cc
index 9079ec093..d62c6d4fb 100644
--- a/tools/src/FileMetadata.cc
+++ b/tools/src/FileMetadata.cc
@@ -83,8 +83,10 @@ void printStripeInformation(std::ostream& out,
 void printRawTail(std::ostream& out,
                   const char*filename) {
   out << "Raw file tail: " << filename << "\n";
+  orc::ReaderOptions readerOpts;
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readFile(filename), orc::ReaderOptions());
+    orc::createReader(orc::readFile(filename, readerOpts.getReaderMetrics()),
+                      readerOpts);
   // Parse the file tail from the serialized one.
   orc::proto::FileTail tail;
   if (!tail.ParseFromString(reader->getSerializedFileTail())) {
@@ -135,8 +137,10 @@ void printAttributes(std::ostream& out, const orc::Type& type,
 }
 
 void printMetadata(std::ostream & out, const char*filename, bool verbose) {
+  orc::ReaderOptions readerOpts;
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readFile(filename), orc::ReaderOptions());
+    orc::createReader(orc::readFile(filename, readerOpts.getReaderMetrics()),
+                      readerOpts);
   out << "{ \"name\": \"" << filename << "\",\n";
   uint64_t numberColumns = reader->getType().getMaximumColumnId() + 1;
   out << "  \"type\": \""
diff --git a/tools/src/FileScan.cc b/tools/src/FileScan.cc
index 1e878837a..8ed6b7b6e 100644
--- a/tools/src/FileScan.cc
+++ b/tools/src/FileScan.cc
@@ -21,10 +21,13 @@
 #include <iostream>
 
 void scanFile(std::ostream & out, const char* filename, uint64_t batchSize,
-              const orc::RowReaderOptions& rowReaderOpts) {
+              const orc::RowReaderOptions& rowReaderOpts,
+              bool showMetrics) {
   orc::ReaderOptions readerOpts;
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readFile(filename), readerOpts);
+    orc::createReader(orc::readFile(filename,
+                                    readerOpts.getReaderMetrics()),
+                      readerOpts);
   std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader(rowReaderOpts);
   std::unique_ptr<orc::ColumnVectorBatch> batch =
     rowReader->createRowBatch(batchSize);
@@ -37,12 +40,16 @@ void scanFile(std::ostream & out, const char* filename, uint64_t batchSize,
   }
   out << "Rows: " << rows << std::endl;
   out << "Batches: " << batches << std::endl;
+  if (showMetrics) {
+    printReaderMetrics(out, reader->getReaderMetrics());
+  }
 }
 
 int main(int argc, char* argv[]) {
   uint64_t batchSize = 1024;
+  bool showMetrics = false;
   orc::RowReaderOptions rowReaderOptions;
-  bool success = parseOptions(&argc, &argv, &batchSize, &rowReaderOptions);
+  bool success = parseOptions(&argc, &argv, &batchSize, &rowReaderOptions, &showMetrics);
   if (argc < 1 || !success) {
     std::cerr << "Usage: orc-scan [options] <filename>...\n";
     printOptions(std::cerr);
@@ -51,7 +58,7 @@ int main(int argc, char* argv[]) {
   }
   for (int i = 0; i < argc; ++i) {
     try {
-      scanFile(std::cout, argv[i], batchSize, rowReaderOptions);
+      scanFile(std::cout, argv[i], batchSize, rowReaderOptions, showMetrics);
     } catch (std::exception& ex) {
       std::cerr << "Caught exception in " << argv[i] << ": " << ex.what() << "\n";
       return 1;
diff --git a/tools/src/FileStatistics.cc b/tools/src/FileStatistics.cc
index f95e88efc..cadb60b20 100644
--- a/tools/src/FileStatistics.cc
+++ b/tools/src/FileStatistics.cc
@@ -29,8 +29,9 @@ void printStatistics(const char *filename, bool withIndex) {
 
   orc::ReaderOptions opts;
   std::unique_ptr<orc::Reader> reader;
-  reader = orc::createReader(orc::readFile(std::string(filename)), opts);
-
+  reader = orc::createReader(orc::readFile(std::string(filename),
+                                           opts.getReaderMetrics()),
+                             opts);
   // print out all selected columns statistics.
   std::unique_ptr<orc::Statistics> colStats = reader->getStatistics();
   std::cout << "File " << filename << " has "
diff --git a/tools/src/ToolsHelper.cc b/tools/src/ToolsHelper.cc
index 20204ab14..3fef98696 100644
--- a/tools/src/ToolsHelper.cc
+++ b/tools/src/ToolsHelper.cc
@@ -26,16 +26,19 @@ void printOptions(std::ostream& out) {
       << "\t-c --columns\t\tComma separated list of top-level column fields\n"
       << "\t-t --columnTypeIds\tComma separated list of column type ids\n"
       << "\t-n --columnNames\tComma separated list of column names\n"
-      << "\t-b --batch\t\tBatch size for reading\n";
+      << "\t-b --batch\t\tBatch size for reading\n"
+      << "\t-m --metrics\t\tShow metrics for reading\n";
 }
 
-bool parseOptions(int* argc, char** argv[], uint64_t* batchSize, orc::RowReaderOptions* rowReaderOpts) {
+bool parseOptions(int* argc, char** argv[], uint64_t* batchSize,
+                  orc::RowReaderOptions* rowReaderOpts, bool* showMetrics) {
   static struct option longOptions[] = {
     {"help", no_argument, ORC_NULLPTR, 'h'},
     {"batch", required_argument, ORC_NULLPTR, 'b'},
     {"columns", required_argument, ORC_NULLPTR, 'c'},
     {"columnTypeIds", required_argument, ORC_NULLPTR, 't'},
     {"columnNames", required_argument, ORC_NULLPTR, 'n'},
+    {"metrics", no_argument, ORC_NULLPTR, 'm'},
     {ORC_NULLPTR, 0, ORC_NULLPTR, 0}
   };
   std::list<uint64_t> cols;
@@ -43,7 +46,7 @@ bool parseOptions(int* argc, char** argv[], uint64_t* batchSize, orc::RowReaderO
   int opt;
   char *tail;
   do {
-    opt = getopt_long(*argc, *argv, "hb:c:t:n:", longOptions, ORC_NULLPTR);
+    opt = getopt_long(*argc, *argv, "hb:c:t:n:m", longOptions, ORC_NULLPTR);
     switch (opt) {
       case '?':
       case 'h':
@@ -80,6 +83,10 @@ bool parseOptions(int* argc, char** argv[], uint64_t* batchSize, orc::RowReaderO
         }
         break;
       }
+      case 'm' : {
+        *showMetrics = true;
+        break;
+      }
       default: break;
     }
   } while (opt != -1);
@@ -87,3 +94,28 @@ bool parseOptions(int* argc, char** argv[], uint64_t* batchSize, orc::RowReaderO
   *argv += optind;
   return true;
 }
+
+void printReaderMetrics(std::ostream& out, const orc::ReaderMetrics* metrics) {
+  if (metrics != nullptr) {
+    static const uint64_t US_PER_SECOND = 1000000;
+    out << "ElapsedTimeSeconds: "
+        << metrics->ReaderInclusiveLatencyUs / US_PER_SECOND << std::endl;
+    out << "DecompressionLatencySeconds: "
+        << metrics->DecompressionLatencyUs / US_PER_SECOND << std::endl;
+    out << "DecodingLatencySeconds: "
+        << metrics->DecodingLatencyUs / US_PER_SECOND << std::endl;
+    out << "ByteDecodingLatencySeconds: "
+        << metrics->ByteDecodingLatencyUs / US_PER_SECOND << std::endl;
+    out << "IOBlockingLatencySeconds: "
+        << metrics->IOBlockingLatencyUs / US_PER_SECOND << std::endl;
+    out << "ReaderCall: " << metrics->ReaderCall << std::endl;
+    out << "DecompressionCall: " << metrics->DecompressionCall << std::endl;
+    out << "DecodingCall: " << metrics->DecodingCall << std::endl;
+    out << "ByteDecodingCall: " << metrics->ByteDecodingCall << std::endl;
+    out << "IOCount: " << metrics->IOCount << std::endl;
+    out << "PPD SelectedRowGroupCount: "
+        << metrics->SelectedRowGroupCount << std::endl;
+    out << "PPD EvaluatedRowGroupCount: "
+        << metrics->EvaluatedRowGroupCount << std::endl;
+  }
+}
diff --git a/tools/src/ToolsHelper.hh b/tools/src/ToolsHelper.hh
index a15235a49..5f4cf43b3 100644
--- a/tools/src/ToolsHelper.hh
+++ b/tools/src/ToolsHelper.hh
@@ -17,9 +17,13 @@
  */
 
 #include "orc/ColumnPrinter.hh"
+#include "orc/Reader.hh"
 #include <iostream>
 
 void printOptions(std::ostream& out);
 
 bool parseOptions(int* argc, char** argv[], uint64_t* batchSize,
-                  orc::RowReaderOptions* rowReaderOpts);
+                  orc::RowReaderOptions* rowReaderOpts, bool* showMetrics);
+
+void printReaderMetrics(std::ostream& out, const orc::ReaderMetrics* metrics);
+
diff --git a/tools/test/TestFileScan.cc b/tools/test/TestFileScan.cc
index c9c1eacc1..11d0d0066 100644
--- a/tools/test/TestFileScan.cc
+++ b/tools/test/TestFileScan.cc
@@ -126,6 +126,7 @@ TEST (TestFileScan, testBadCommand) {
             "\t-t --columnTypeIds\tComma separated list of column type ids\n"
             "\t-n --columnNames\tComma separated list of column names\n"
             "\t-b --batch\t\tBatch size for reading\n"
+            "\t-m --metrics\t\tShow metrics for reading\n"
             "Scans and displays the row count of the ORC files.\n",
             removeChars(stripPrefix(error, "orc-scan: "),"'`"));
 
@@ -140,6 +141,7 @@ TEST (TestFileScan, testBadCommand) {
             "\t-t --columnTypeIds\tComma separated list of column type ids\n"
             "\t-n --columnNames\tComma separated list of column names\n"
             "\t-b --batch\t\tBatch size for reading\n"
+            "\t-m --metrics\t\tShow metrics for reading\n"
             "Scans and displays the row count of the ORC files.\n",
             error);
 
@@ -154,6 +156,7 @@ TEST (TestFileScan, testBadCommand) {
             "\t-t --columnTypeIds\tComma separated list of column type ids\n"
             "\t-n --columnNames\tComma separated list of column names\n"
             "\t-b --batch\t\tBatch size for reading\n"
+            "\t-m --metrics\t\tShow metrics for reading\n"
             "Scans and displays the row count of the ORC files.\n",
             error);
 
@@ -168,6 +171,7 @@ TEST (TestFileScan, testBadCommand) {
             "\t-t --columnTypeIds\tComma separated list of column type ids\n"
             "\t-n --columnNames\tComma separated list of column names\n"
             "\t-b --batch\t\tBatch size for reading\n"
+            "\t-m --metrics\t\tShow metrics for reading\n"
             "Scans and displays the row count of the ORC files.\n",
             removeChars(stripPrefix(error, "orc-scan: "), "'`"));
 
@@ -182,6 +186,7 @@ TEST (TestFileScan, testBadCommand) {
             "\t-t --columnTypeIds\tComma separated list of column type ids\n"
             "\t-n --columnNames\tComma separated list of column names\n"
             "\t-b --batch\t\tBatch size for reading\n"
+            "\t-m --metrics\t\tShow metrics for reading\n"
             "Scans and displays the row count of the ORC files.\n",
             error);
 
@@ -196,6 +201,7 @@ TEST (TestFileScan, testBadCommand) {
             "\t-t --columnTypeIds\tComma separated list of column type ids\n"
             "\t-n --columnNames\tComma separated list of column names\n"
             "\t-b --batch\t\tBatch size for reading\n"
+            "\t-m --metrics\t\tShow metrics for reading\n"
             "Scans and displays the row count of the ORC files.\n",
             error);
 }
diff --git a/tools/test/TestMatch.cc b/tools/test/TestMatch.cc
index 9fb28f494..d0d743b81 100644
--- a/tools/test/TestMatch.cc
+++ b/tools/test/TestMatch.cc
@@ -104,7 +104,8 @@ namespace orc {
   TEST_P(FileParam, Metadata) {
     orc::ReaderOptions readerOpts;
     std::unique_ptr<Reader> reader =
-      createReader(readLocalFile(getFilename()), readerOpts);
+      createReader(readLocalFile(getFilename(), readerOpts.getReaderMetrics()),
+                   readerOpts);
     std::unique_ptr<RowReader> rowReader = reader->createRowReader();
 
     EXPECT_EQ(GetParam().compression, reader->getCompression());
@@ -134,7 +135,9 @@ namespace orc {
   TEST_P(FileParam, Contents) {
     orc::ReaderOptions readerOpts;
     std::unique_ptr<RowReader> rowReader =
-         createReader(readLocalFile(getFilename()), readerOpts)->createRowReader();
+         createReader(readLocalFile(getFilename(),
+                                    readerOpts.getReaderMetrics()),
+                      readerOpts)->createRowReader();
 
     unsigned long rowCount = 0;
     std::unique_ptr<ColumnVectorBatch> batch = rowReader->createRowBatch(1024);
@@ -595,7 +598,8 @@ INSTANTIATE_TEST_CASE_P(TestMatch1900, FileParam,
     rowReaderOpts.include(includes);
     std::string filename = findExample("demo-11-none.orc");
     std::unique_ptr<Reader> reader =
-      createReader(readLocalFile(filename), readerOpts);
+      createReader(readLocalFile(filename, readerOpts.getReaderMetrics()),
+                   readerOpts);
     std::unique_ptr<RowReader> rowReader = reader->createRowReader(rowReaderOpts);
 
     EXPECT_EQ(CompressionKind_NONE, reader->getCompression());
@@ -665,7 +669,9 @@ INSTANTIATE_TEST_CASE_P(TestMatch1900, FileParam,
   TEST(TestMatch, stripeInformationTest) {
     ReaderOptions opts;
     std::string filename = findExample("demo-11-none.orc");
-    std::unique_ptr<Reader> reader = createReader(readLocalFile(filename), opts);
+    std::unique_ptr<Reader> reader =
+      createReader(readLocalFile(filename, opts.getReaderMetrics()),
+                   opts);
 
     EXPECT_EQ(385, reader->getNumberOfStripes());
 
@@ -688,7 +694,8 @@ INSTANTIATE_TEST_CASE_P(TestMatch1900, FileParam,
     // stripes[7, 16]
     offsetOpts.range(80000, 130722);
     std::string filename = findExample("demo-11-none.orc");
-    std::unique_ptr<Reader> reader = createReader(readLocalFile(filename), opts);
+    std::unique_ptr<Reader> reader =
+      createReader(readLocalFile(filename, opts.getReaderMetrics()), opts);
     std::unique_ptr<RowReader> fullReader = reader->createRowReader(fullOpts);
     std::unique_ptr<RowReader> lastReader = reader->createRowReader(lastOpts);
     std::unique_ptr<RowReader> oobReader = reader->createRowReader(oobOpts);
@@ -764,7 +771,8 @@ TEST(TestMatch, columnStatistics) {
   orc::ReaderOptions opts;
   std::string filename = findExample("demo-11-none.orc");
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readLocalFile(filename), opts);
+    orc::createReader(orc::readLocalFile(filename, opts.getReaderMetrics()),
+                      opts);
 
   // corrupt stats test
   EXPECT_EQ(true, reader->hasCorrectStatistics());
@@ -798,7 +806,8 @@ TEST(TestMatch, stripeStatistics) {
   orc::ReaderOptions opts;
   std::string filename = findExample("demo-11-none.orc");
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readLocalFile(filename), opts);
+    orc::createReader(orc::readLocalFile(filename, opts.getReaderMetrics()),
+                      opts);
 
   // test stripe statistics
   EXPECT_EQ(385, reader->getNumberOfStripeStatistics());
@@ -832,7 +841,8 @@ TEST(TestMatch, corruptStatistics) {
   // read the file has corrupt statistics
   std::string filename = findExample("orc_split_elim.orc");
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readLocalFile(filename), opts);
+    orc::createReader(orc::readLocalFile(filename, opts.getReaderMetrics()),
+                      opts);
 
   EXPECT_EQ(true, !reader->hasCorrectStatistics());
 
@@ -863,7 +873,8 @@ TEST(TestMatch, noStripeStatistics) {
   // read the file has no stripe statistics
   std::string filename = findExample("orc-file-11-format.orc");
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readLocalFile(filename), opts);
+    orc::createReader(orc::readLocalFile(filename, opts.getReaderMetrics()),
+                      opts);
 
   EXPECT_EQ(0, reader->getNumberOfStripeStatistics());
 }
@@ -874,7 +885,9 @@ TEST(TestMatch, seekToRow) {
     orc::ReaderOptions readerOpts;
     std::string filename = findExample("demo-11-none.orc");
     std::unique_ptr<orc::Reader> reader =
-        orc::createReader(orc::readLocalFile(filename), readerOpts);
+        orc::createReader(orc::readLocalFile(filename,
+                                             readerOpts.getReaderMetrics()),
+                          readerOpts);
     std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader();
     EXPECT_EQ(1920800, reader->getNumberOfRows());
 
@@ -911,7 +924,9 @@ TEST(TestMatch, seekToRow) {
     rowReaderOpts.range(13126, 13145);   // Read only the second stripe (rows 5000..9999)
 
     std::unique_ptr<orc::Reader> reader =
-        orc::createReader(orc::readLocalFile(filename), readerOpts);
+        orc::createReader(orc::readLocalFile(filename,
+                                             readerOpts.getReaderMetrics()),
+                          readerOpts);
     std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader(rowReaderOpts);
     EXPECT_EQ(1920800, reader->getNumberOfRows());
 
@@ -941,7 +956,9 @@ TEST(TestMatch, seekToRow) {
     orc::ReaderOptions readerOpts;
     std::string filename = findExample("TestOrcFile.emptyFile.orc");
     std::unique_ptr<orc::Reader> reader =
-        orc::createReader(orc::readLocalFile(filename), readerOpts);
+        orc::createReader(orc::readLocalFile(filename,
+                                             readerOpts.getReaderMetrics()),
+                          readerOpts);
     std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader();
     EXPECT_EQ(0, reader->getNumberOfRows());
 
@@ -968,7 +985,8 @@ TEST(TestMatch, futureFormatVersion) {
   std::ostringstream errorMsg;
   opts.setErrorStream(errorMsg);
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readLocalFile(filename), opts);
+    orc::createReader(orc::readLocalFile(filename, opts.getReaderMetrics()),
+                      opts);
   EXPECT_EQ(("Warning: ORC file " + filename +
              " was written in an unknown format version 19.99\n"),
             errorMsg.str());
@@ -982,7 +1000,9 @@ TEST(TestMatch, selectColumns) {
 
     // All columns
     std::unique_ptr<orc::Reader> reader =
-        orc::createReader(orc::readLocalFile(filename), readerOpts);
+        orc::createReader(orc::readLocalFile(filename,
+                                             readerOpts.getReaderMetrics()),
+                          readerOpts);
     std::unique_ptr<orc::RowReader> rowReader = reader->createRowReader(rowReaderOpts);
     std::vector<bool> c = rowReader->getSelectedColumns();
     EXPECT_EQ(24, c.size());
@@ -1191,7 +1211,9 @@ TEST(Reader, memoryUse) {
   // Int column
   cols.push_back(1);
   rowReaderOpts.include(cols);
-  reader = orc::createReader(orc::readLocalFile(filename), readerOpts);
+  reader = orc::createReader(orc::readLocalFile(filename,
+                                                readerOpts.getReaderMetrics()),
+                             readerOpts);
   rowReader = reader->createRowReader(rowReaderOpts);
   EXPECT_EQ(483517, reader->getMemoryUseByFieldId(cols));
   batch = rowReader->createRowBatch(1);
@@ -1204,7 +1226,9 @@ TEST(Reader, memoryUse) {
   cols.clear();
   cols.push_back(7);
   rowReaderOpts.include(cols);
-  reader = orc::createReader(orc::readLocalFile(filename), readerOpts);
+  reader = orc::createReader(orc::readLocalFile(filename,
+                                                readerOpts.getReaderMetrics()),
+                             readerOpts);
   rowReader = reader->createRowReader(rowReaderOpts);
   EXPECT_EQ(835906, reader->getMemoryUseByFieldId(cols));
   batch = rowReader->createRowBatch(1);
@@ -1215,7 +1239,9 @@ TEST(Reader, memoryUse) {
   cols.clear();
   cols.push_back(8);
   rowReaderOpts.include(cols);
-  reader = orc::createReader(orc::readLocalFile(filename), readerOpts);
+  reader = orc::createReader(orc::readLocalFile(filename,
+                                                readerOpts.getReaderMetrics()),
+                             readerOpts);
   rowReader = reader->createRowReader(rowReaderOpts);
   EXPECT_EQ(901442, reader->getMemoryUseByFieldId(cols));
   batch = rowReader->createRowBatch(1);
@@ -1226,7 +1252,9 @@ TEST(Reader, memoryUse) {
   cols.clear();
   cols.push_back(9);
   rowReaderOpts.include(cols);
-  reader = orc::createReader(orc::readLocalFile(filename), readerOpts);
+  reader = orc::createReader(orc::readLocalFile(filename,
+                                                readerOpts.getReaderMetrics()),
+                             readerOpts);
   rowReader = reader->createRowReader(rowReaderOpts);
   EXPECT_EQ(1294658, reader->getMemoryUseByFieldId(cols));
   batch = rowReader->createRowBatch(1);
@@ -1237,7 +1265,9 @@ TEST(Reader, memoryUse) {
   cols.clear();
   cols.push_back(10);
   rowReaderOpts.include(cols);
-  reader = orc::createReader(orc::readLocalFile(filename), readerOpts);
+  reader = orc::createReader(orc::readLocalFile(filename,
+                                                readerOpts.getReaderMetrics()),
+                             readerOpts);
   rowReader = reader->createRowReader(rowReaderOpts);
   EXPECT_EQ(1229122, reader->getMemoryUseByFieldId(cols));
   batch = rowReader->createRowBatch(1);
@@ -1248,7 +1278,9 @@ TEST(Reader, memoryUse) {
   cols.clear();
   cols.push_back(11);
   rowReaderOpts.include(cols);
-  reader = orc::createReader(orc::readLocalFile(filename), readerOpts);
+  reader = orc::createReader(orc::readLocalFile(filename,
+                                                readerOpts.getReaderMetrics()),
+                             readerOpts);
   rowReader = reader->createRowReader(rowReaderOpts);
   EXPECT_EQ(1491266, reader->getMemoryUseByFieldId(cols));
   batch = rowReader->createRowBatch(1);
@@ -1261,7 +1293,9 @@ TEST(Reader, memoryUse) {
     cols.push_back(c);
   }
   rowReaderOpts.include(cols);
-  reader = orc::createReader(orc::readLocalFile(filename), readerOpts);
+  reader = orc::createReader(orc::readLocalFile(filename,
+                                                readerOpts.getReaderMetrics()),
+                             readerOpts);
   rowReader = reader->createRowReader(rowReaderOpts);
   EXPECT_EQ(4112706, reader->getMemoryUseByFieldId(cols));
   batch = rowReader->createRowBatch(1);
@@ -3298,7 +3332,8 @@ TEST(TestMatch, serializedConstructor) {
 
   // open a file
   std::unique_ptr<orc::Reader> reader =
-    orc::createReader(orc::readLocalFile(filename), opts);
+    orc::createReader(orc::readLocalFile(filename, opts.getReaderMetrics()),
+                      opts);
 
   // for the next reader copy the serialized tail
   std::string tail = reader->getSerializedFileTail();