You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by do...@apache.org on 2022/08/22 04:05:03 UTC
[orc] branch main updated: ORC-1252:[C++] Expose IO metrics for write operation
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new a49f87d49 ORC-1252:[C++] Expose IO metrics for write operation
a49f87d49 is described below
commit a49f87d492aa62ec2be2d4bce2fcfe1f53ca05d9
Author: coderex2522 <re...@gmail.com>
AuthorDate: Sun Aug 21 21:04:53 2022 -0700
ORC-1252:[C++] Expose IO metrics for write operation
### What changes were proposed in this pull request?
The pull request will add IO metrics for the write operation. The csv-import tool can display relevant IO metrics.
### Why are the changes needed?
This patch can expose the metrics information related to the process of writing ORC files.
### How was this patch tested?
Test IO metrics in BufferedOutputStream's UTs. And the csv-import tool with option -m can be used for testing.
Closes #1223 from coderex2522/ORC-1252.
Authored-by: coderex2522 <re...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
c++/include/orc/Writer.hh | 21 +++++++++++
c++/src/ColumnWriter.cc | 3 +-
c++/src/Compression.cc | 68 ++++++++++++++++++++++++------------
c++/src/Compression.hh | 3 +-
c++/src/Writer.cc | 25 +++++++++++--
c++/src/io/OutputStream.cc | 12 +++++--
c++/src/io/OutputStream.hh | 11 +++++-
c++/test/TestBufferedOutputStream.cc | 18 ++++++++--
c++/test/TestByteRLEEncoder.cc | 20 +++++------
c++/test/TestByteRle.cc | 6 ++--
c++/test/TestCompression.cc | 16 ++++++---
c++/test/TestRleEncoder.cc | 3 +-
tools/src/CSVFileImport.cc | 17 ++++++++-
13 files changed, 170 insertions(+), 53 deletions(-)
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 78b0b97d2..1a95572e7 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -24,6 +24,7 @@
#include "orc/Type.hh"
#include "orc/Vector.hh"
+#include <atomic>
#include <memory>
#include <set>
#include <string>
@@ -46,6 +47,15 @@ namespace orc {
class Timezone;
+ /**
+ * Expose the IO metrics for write operation.
+ */
+ struct WriterMetrics {
+ // Record the number of IO requests written to the output file
+ std::atomic<uint64_t> IOCount{0};
+ // Record the lantency of IO blocking
+ std::atomic<uint64_t> IOBlockingLatencyUs{0};
+ };
/**
* Options for creating a Writer.
*/
@@ -235,6 +245,17 @@ namespace orc {
* @param zone writer timezone name
*/
WriterOptions& setTimezoneName(const std::string& zone);
+
+ /**
+ * Set the writer metrics.
+ */
+ WriterOptions& setWriterMetrics(WriterMetrics * metrics);
+
+ /**
+ * Get the writer metrics.
+ * @return if not set, return nullptr.
+ */
+ WriterMetrics * getWriterMetrics() const;
};
class Writer {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 32b68af34..c8375d317 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -58,7 +58,8 @@ namespace orc {
// BufferedOutputStream initial capacity
1 * 1024 * 1024,
options.getCompressionBlockSize(),
- *options.getMemoryPool());
+ *options.getMemoryPool(),
+ options.getWriterMetrics());
}
std::unique_ptr<StreamsFactory> createStreamsFactory(
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 0b4969f41..81c6fb9e3 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -54,7 +54,8 @@ namespace orc {
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool);
+ MemoryPool& pool,
+ WriterMetrics* metrics);
virtual bool Next(void** data, int*size) override = 0;
virtual void BackUp(int count) override;
@@ -98,11 +99,13 @@ namespace orc {
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool) :
+ MemoryPool& pool,
+ WriterMetrics* metrics) :
BufferedOutputStream(pool,
outStream,
capacity,
- blockSize),
+ blockSize,
+ metrics),
rawInputBuffer(pool, blockSize),
level(compressionLevel),
outputBuffer(nullptr),
@@ -160,7 +163,8 @@ namespace orc {
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool);
+ MemoryPool& pool,
+ WriterMetrics* metrics);
virtual bool Next(void** data, int*size) override;
virtual std::string getName() const override = 0;
@@ -174,12 +178,14 @@ namespace orc {
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool) :
+ MemoryPool& pool,
+ WriterMetrics* metrics) :
CompressionStreamBase(outStream,
compressionLevel,
capacity,
blockSize,
- pool) {
+ pool,
+ metrics) {
// PASS
}
@@ -219,7 +225,8 @@ namespace orc {
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool);
+ MemoryPool& pool,
+ WriterMetrics* metrics);
virtual ~ZlibCompressionStream() override {
end();
@@ -241,12 +248,14 @@ namespace orc {
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool)
+ MemoryPool& pool,
+ WriterMetrics* metrics)
: CompressionStream(outStream,
compressionLevel,
capacity,
blockSize,
- pool) {
+ pool,
+ metrics) {
init();
}
@@ -934,12 +943,14 @@ DIAGNOSTIC_POP
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool)
+ MemoryPool& pool,
+ WriterMetrics* metrics)
: CompressionStreamBase(outStream,
compressionLevel,
capacity,
blockSize,
- pool)
+ pool,
+ metrics)
, compressorBuffer(pool) {
// PASS
}
@@ -1022,12 +1033,14 @@ DIAGNOSTIC_POP
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool)
+ MemoryPool& pool,
+ WriterMetrics* metrics)
: BlockCompressionStream(outStream,
compressionLevel,
capacity,
blockSize,
- pool) {
+ pool,
+ metrics) {
this->init();
}
@@ -1086,12 +1099,14 @@ DIAGNOSTIC_POP
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool)
+ MemoryPool& pool,
+ WriterMetrics* metrics)
: BlockCompressionStream(outStream,
compressionLevel,
capacity,
blockSize,
- pool) {
+ pool,
+ metrics) {
}
virtual std::string getName() const override {
@@ -1129,12 +1144,14 @@ DIAGNOSTIC_POP
int compressionLevel,
uint64_t capacity,
uint64_t blockSize,
- MemoryPool& pool)
+ MemoryPool& pool,
+ WriterMetrics* metrics)
: BlockCompressionStream(outStream,
compressionLevel,
capacity,
blockSize,
- pool) {
+ pool,
+ metrics) {
this->init();
}
@@ -1268,39 +1285,44 @@ DIAGNOSTIC_PUSH
CompressionStrategy strategy,
uint64_t bufferCapacity,
uint64_t compressionBlockSize,
- MemoryPool& pool) {
+ MemoryPool& pool,
+ WriterMetrics* metrics) {
switch (static_cast<int64_t>(kind)) {
case CompressionKind_NONE: {
return std::unique_ptr<BufferedOutputStream>
(new BufferedOutputStream(
- pool, outStream, bufferCapacity, compressionBlockSize));
+ pool, outStream, bufferCapacity, compressionBlockSize, metrics));
}
case CompressionKind_ZLIB: {
int level = (strategy == CompressionStrategy_SPEED) ?
Z_BEST_SPEED + 1 : Z_DEFAULT_COMPRESSION;
return std::unique_ptr<BufferedOutputStream>
(new ZlibCompressionStream(
- outStream, level, bufferCapacity, compressionBlockSize, pool));
+ outStream, level, bufferCapacity,
+ compressionBlockSize, pool, metrics));
}
case CompressionKind_ZSTD: {
int level = (strategy == CompressionStrategy_SPEED) ?
1 : ZSTD_CLEVEL_DEFAULT;
return std::unique_ptr<BufferedOutputStream>
(new ZSTDCompressionStream(
- outStream, level, bufferCapacity, compressionBlockSize, pool));
+ outStream, level, bufferCapacity,
+ compressionBlockSize, pool, metrics));
}
case CompressionKind_LZ4: {
int level = (strategy == CompressionStrategy_SPEED) ?
LZ4_ACCELERATION_MAX : LZ4_ACCELERATION_DEFAULT;
return std::unique_ptr<BufferedOutputStream>
(new Lz4CompressionSteam(
- outStream, level, bufferCapacity, compressionBlockSize, pool));
+ outStream, level, bufferCapacity,
+ compressionBlockSize, pool, metrics));
}
case CompressionKind_SNAPPY: {
int level = 0;
return std::unique_ptr<BufferedOutputStream>
(new SnappyCompressionStream(
- outStream, level, bufferCapacity, compressionBlockSize, pool));
+ outStream, level, bufferCapacity,
+ compressionBlockSize, pool, metrics));
}
case CompressionKind_LZO:
default:
diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh
index 6457ad98c..50a252443 100644
--- a/c++/src/Compression.hh
+++ b/c++/src/Compression.hh
@@ -54,7 +54,8 @@ namespace orc {
CompressionStrategy strategy,
uint64_t bufferCapacity,
uint64_t compressionBlockSize,
- MemoryPool& pool);
+ MemoryPool& pool,
+ WriterMetrics* metrics);
}
#endif
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 05adb4744..ca7003f8e 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -21,6 +21,7 @@
#include "ColumnWriter.hh"
#include "Timezone.hh"
+#include "Utils.hh"
#include <memory>
@@ -42,6 +43,7 @@ namespace orc {
double bloomFilterFalsePositiveProb;
BloomFilterVersion bloomFilterVersion;
std::string timezone;
+ WriterMetrics* metrics;
WriterOptionsPrivate() :
fileVersion(FileVersion::v_0_12()) { // default to Hive_0_12
@@ -61,6 +63,7 @@ namespace orc {
//introduced by moving timestamps between different timezones.
//Explictly set the writer timezone if the use case depends on it.
timezone = "GMT";
+ metrics = nullptr;
}
};
@@ -255,6 +258,15 @@ namespace orc {
return *this;
}
+ WriterMetrics* WriterOptions::getWriterMetrics() const {
+ return privateBits->metrics;
+ }
+
+ WriterOptions& WriterOptions::setWriterMetrics(WriterMetrics* metrics) {
+ privateBits->metrics = metrics;
+ return *this;
+ }
+
Writer::~Writer() {
// PASS
}
@@ -328,14 +340,16 @@ namespace orc {
options.getCompressionStrategy(),
1 * 1024 * 1024, // buffer capacity: 1M
options.getCompressionBlockSize(),
- *options.getMemoryPool());
+ *options.getMemoryPool(),
+ options.getWriterMetrics());
// uncompressed stream for post script
bufferedStream.reset(new BufferedOutputStream(
*options.getMemoryPool(),
outStream,
1024, // buffer capacity: 1024 bytes
- options.getCompressionBlockSize()));
+ options.getCompressionBlockSize(),
+ options.getWriterMetrics()));
init();
}
@@ -393,7 +407,11 @@ namespace orc {
void WriterImpl::init() {
// Write file header
const static size_t magicIdLength = strlen(WriterImpl::magicId);
- outStream->write(WriterImpl::magicId, magicIdLength);
+ {
+ SCOPED_STOPWATCH(
+ options.getWriterMetrics(), IOBlockingLatencyUs, IOCount);
+ outStream->write(WriterImpl::magicId, magicIdLength);
+ }
currentOffset += magicIdLength;
// Initialize file footer
@@ -541,6 +559,7 @@ namespace orc {
}
unsigned char psLength =
static_cast<unsigned char>(bufferedStream->flush());
+ SCOPED_STOPWATCH(options.getWriterMetrics(), IOBlockingLatencyUs, IOCount);
outStream->write(&psLength, sizeof(unsigned char));
}
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index 8770d286d..4485e1b29 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -18,6 +18,7 @@
#include "orc/Exceptions.hh"
#include "OutputStream.hh"
+#include "Utils.hh"
#include <sstream>
@@ -31,9 +32,11 @@ namespace orc {
MemoryPool& pool,
OutputStream * outStream,
uint64_t capacity_,
- uint64_t blockSize_)
+ uint64_t blockSize_,
+ WriterMetrics* metrics_)
: outputStream(outStream),
- blockSize(blockSize_) {
+ blockSize(blockSize_),
+ metrics(metrics_) {
dataBuffer.reset(new DataBuffer<char>(pool));
dataBuffer->reserve(capacity_);
}
@@ -92,7 +95,10 @@ namespace orc {
uint64_t BufferedOutputStream::flush() {
uint64_t dataSize = dataBuffer->size();
- outputStream->write(dataBuffer->data(), dataSize);
+ {
+ SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
+ outputStream->write(dataBuffer->data(), dataSize);
+ }
dataBuffer->resize(0);
return dataSize;
}
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index 69c06d5f6..c49b769d5 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -34,6 +34,12 @@ namespace orc {
virtual void add(uint64_t pos) = 0;
};
+DIAGNOSTIC_PUSH
+
+#ifdef __clang__
+ DIAGNOSTIC_IGNORE("-Wunused-private-field")
+#endif
+ struct WriterMetrics;
/**
* A subclass of Google's ZeroCopyOutputStream that supports output to memory
* buffer, and flushing to OutputStream.
@@ -45,12 +51,14 @@ namespace orc {
OutputStream * outputStream;
std::unique_ptr<DataBuffer<char> > dataBuffer;
uint64_t blockSize;
+ WriterMetrics* metrics;
public:
BufferedOutputStream(MemoryPool& pool,
OutputStream * outStream,
uint64_t capacity,
- uint64_t block_size);
+ uint64_t block_size,
+ WriterMetrics* metrics);
virtual ~BufferedOutputStream() override;
virtual bool Next(void** data, int*size) override;
@@ -66,6 +74,7 @@ namespace orc {
virtual bool isCompressed() const { return false; }
};
+DIAGNOSTIC_POP
/**
* An append only buffered stream that allows
diff --git a/c++/test/TestBufferedOutputStream.cc b/c++/test/TestBufferedOutputStream.cc
index b47e1e54f..448194035 100644
--- a/c++/test/TestBufferedOutputStream.cc
+++ b/c++/test/TestBufferedOutputStream.cc
@@ -28,7 +28,9 @@ namespace orc {
uint64_t capacity = 1000;
uint64_t block = 10;
- BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ WriterMetrics metrics;
+ BufferedOutputStream bufStream(
+ *pool, &memStream, capacity, block, &metrics);
for (int i = 0; i < 100; ++i) {
char * buf;
int len;
@@ -44,6 +46,9 @@ namespace orc {
for (int i = 0; i < 1000; ++i) {
EXPECT_EQ(memStream.getData()[i], 'a' + i % 10);
}
+#if ENABLE_METRICS
+ EXPECT_EQ(metrics.IOCount.load(), 1);
+#endif
}
TEST(BufferedOutputStream, block_not_aligned) {
@@ -52,7 +57,9 @@ namespace orc {
uint64_t capacity = 20;
uint64_t block = 10;
- BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ WriterMetrics metrics;
+ BufferedOutputStream bufStream(
+ *pool, &memStream, capacity, block, &metrics);
char * buf;
int len;
@@ -89,6 +96,9 @@ namespace orc {
for (int i = 0; i < 5; ++i) {
EXPECT_EQ(memStream.getData()[i + 7], 'a' + i);
}
+#if ENABLE_METRICS
+ EXPECT_EQ(metrics.IOCount.load(), 2);
+#endif
}
TEST(BufferedOutputStream, protobuff_serialization) {
@@ -97,7 +107,9 @@ namespace orc {
uint64_t capacity = 20;
uint64_t block = 10;
- BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ WriterMetrics metrics;
+ BufferedOutputStream bufStream(
+ *pool, &memStream, capacity, block, &metrics);
proto::PostScript ps;
ps.set_footerlength(197934);
diff --git a/c++/test/TestByteRLEEncoder.cc b/c++/test/TestByteRLEEncoder.cc
index 539cfad15..1cfd16984 100644
--- a/c++/test/TestByteRLEEncoder.cc
+++ b/c++/test/TestByteRLEEncoder.cc
@@ -119,10 +119,10 @@ namespace orc {
uint64_t capacity = 500 * 1024;
uint64_t block = 1024;
- BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
std::unique_ptr<BufferedOutputStream> outStream(
- new BufferedOutputStream(*pool, &memStream, capacity, block));
+ new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
std::unique_ptr<ByteRleEncoder> encoder =
createByteRleEncoder(std::move(outStream));
@@ -142,10 +142,10 @@ namespace orc {
uint64_t capacity = 500 * 1024;
uint64_t block = 1024;
- BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
std::unique_ptr<BufferedOutputStream> outStream(
- new BufferedOutputStream(*pool, &memStream, capacity, block));
+ new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
std::unique_ptr<ByteRleEncoder> encoder =
createByteRleEncoder(std::move(outStream));
@@ -167,10 +167,10 @@ namespace orc {
uint64_t capacity = 500 * 1024;
uint64_t block = 1024;
- BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
std::unique_ptr<BufferedOutputStream> outStream(
- new BufferedOutputStream(*pool, &memStream, capacity, block));
+ new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
std::unique_ptr<ByteRleEncoder> encoder =
createBooleanRleEncoder(std::move(outStream));
@@ -190,10 +190,10 @@ namespace orc {
uint64_t capacity = 500 * 1024;
uint64_t block = 1024;
- BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
std::unique_ptr<BufferedOutputStream> outStream(
- new BufferedOutputStream(*pool, &memStream, capacity, block));
+ new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
std::unique_ptr<ByteRleEncoder> encoder =
createBooleanRleEncoder(std::move(outStream));
@@ -213,10 +213,10 @@ namespace orc {
uint64_t capacity = 500 * 1024;
uint64_t block = 1024;
- BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block, nullptr);
std::unique_ptr<BufferedOutputStream> outStream(
- new BufferedOutputStream(*pool, &memStream, capacity, block));
+ new BufferedOutputStream(*pool, &memStream, capacity, block, nullptr));
std::unique_ptr<ByteRleEncoder> encoder =
createBooleanRleEncoder(std::move(outStream));
diff --git a/c++/test/TestByteRle.cc b/c++/test/TestByteRle.cc
index 52b0e984d..d2715f34e 100644
--- a/c++/test/TestByteRle.cc
+++ b/c++/test/TestByteRle.cc
@@ -1449,7 +1449,8 @@ TEST(BooleanRle, seekBoolAndByteRLE) {
CompressionStrategy_COMPRESSION,
capacity,
blockSize,
- *getDefaultPool()));
+ *getDefaultPool(),
+ nullptr));
encoder->add(data, numValues, nullptr);
encoder->flush();
@@ -1482,7 +1483,8 @@ TEST(BooleanRle, seekBoolAndByteRLE) {
uint64_t capacity = 500 * 1024;
uint64_t block = 1024;
std::unique_ptr<BufferedOutputStream> outStream(
- new BufferedOutputStream(*getDefaultPool(), &memStream, capacity, block));
+ new BufferedOutputStream(
+ *getDefaultPool(), &memStream, capacity, block, nullptr));
std::unique_ptr<ByteRleEncoder> encoder =
createBooleanRleEncoder(std::move(outStream));
diff --git a/c++/test/TestCompression.cc b/c++/test/TestCompression.cc
index 17d569f2c..0ecebf429 100644
--- a/c++/test/TestCompression.cc
+++ b/c++/test/TestCompression.cc
@@ -85,7 +85,8 @@ namespace orc {
strategy,
capacity,
block,
- pool);
+ pool,
+ nullptr);
size_t pos = 0;
char * compressBuffer;
@@ -284,7 +285,8 @@ namespace orc {
CompressionStrategy_SPEED,
capacity,
block,
- *pool);
+ *pool,
+ nullptr);
EXPECT_TRUE(ps.SerializeToZeroCopyStream(compressStream.get()));
compressStream->flush();
@@ -394,8 +396,14 @@ namespace orc {
CompressionStrategy strategy = CompressionStrategy_COMPRESSION;
uint64_t batchSize = 1024, blockSize = 256;
- AppendOnlyBufferedStream outStream(createCompressor(
- kind, &memStream, strategy, DEFAULT_MEM_STREAM_SIZE, blockSize, *pool));
+ AppendOnlyBufferedStream outStream(
+ createCompressor(kind,
+ &memStream,
+ strategy,
+ DEFAULT_MEM_STREAM_SIZE,
+ blockSize,
+ *pool,
+ nullptr));
// write 3 batches of data and record positions between every batch
size_t row = 0;
diff --git a/c++/test/TestRleEncoder.cc b/c++/test/TestRleEncoder.cc
index 73be2f46a..3ed6e58e3 100644
--- a/c++/test/TestRleEncoder.cc
+++ b/c++/test/TestRleEncoder.cc
@@ -126,7 +126,8 @@ namespace orc {
return createRleEncoder(
std::unique_ptr<BufferedOutputStream>(
- new BufferedOutputStream(*pool, &memStream, 500 * 1024, 1024)),
+ new BufferedOutputStream(
+ *pool, &memStream, 500 * 1024, 1024, nullptr)),
isSigned, version, *pool, alignBitpacking);
}
diff --git a/tools/src/CSVFileImport.cc b/tools/src/CSVFileImport.cc
index 280491b3b..ae39ecdd5 100644
--- a/tools/src/CSVFileImport.cc
+++ b/tools/src/CSVFileImport.cc
@@ -277,6 +277,7 @@ void fillTimestampValues(const std::vector<std::string>& data,
void usage() {
std::cout << "Usage: csv-import [-h] [--help]\n"
+ << " [-m] [--metrics]\n"
<< " [-d <character>] [--delimiter=<character>]\n"
<< " [-s <size>] [--stripe=<size>]\n"
<< " [-c <size>] [--block=<size>]\n"
@@ -300,6 +301,7 @@ int main(int argc, char* argv[]) {
static struct option longOptions[] = {
{"help", no_argument, ORC_NULLPTR, 'h'},
+ {"metrics", no_argument, ORC_NULLPTR, 'm'},
{"delimiter", required_argument, ORC_NULLPTR, 'd'},
{"stripe", required_argument, ORC_NULLPTR, 's'},
{"block", required_argument, ORC_NULLPTR, 'c'},
@@ -308,15 +310,19 @@ int main(int argc, char* argv[]) {
{ORC_NULLPTR, 0, ORC_NULLPTR, 0}
};
bool helpFlag = false;
+ bool showMetrics = false;
int opt;
char *tail;
do {
- opt = getopt_long(argc, argv, "d:s:c:b:t:h", longOptions, ORC_NULLPTR);
+ opt = getopt_long(argc, argv, "d:s:c:b:t:mh", longOptions, ORC_NULLPTR);
switch (opt) {
case 'h':
helpFlag = true;
opt = -1;
break;
+ case 'm':
+ showMetrics = true;
+ break;
case 'd':
gDelimiter = optarg[0];
break;
@@ -369,10 +375,12 @@ int main(int argc, char* argv[]) {
DataBufferList bufferList;
orc::WriterOptions options;
+ orc::WriterMetrics metrics;
options.setStripeSize(stripeSize);
options.setCompressionBlockSize(blockSize);
options.setCompression(compression);
options.setTimezoneName(timezoneName);
+ options.setWriterMetrics(showMetrics ? &metrics : nullptr);
ORC_UNIQUE_PTR<orc::OutputStream> outStream = orc::writeLocalFile(output);
ORC_UNIQUE_PTR<orc::Writer> writer =
@@ -502,5 +510,12 @@ int main(int argc, char* argv[]) {
std::cout << GetDate() << " Total writer CPU time: "
<< static_cast<double>(totalCPUTime) / CLOCKS_PER_SEC
<< "s." << std::endl;
+ if (showMetrics) {
+ std::cout << GetDate() << " IO block lantency: "
+ << static_cast<double>(metrics.IOBlockingLatencyUs) / 1000000.0
+ << "s." << std::endl;
+ std::cout << GetDate() << " IO count: "
+ << metrics.IOCount << std::endl;
+ }
return 0;
}