You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by rs...@apache.org on 2020/11/05 13:55:38 UTC
[avro] 05/05: AVRO-2891: Expose last sync offset written on
DataFileWriter (#954)
This is an automated email from the ASF dual-hosted git repository.
rskraba pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/avro.git
commit 3961ea1a2a02816c6d175789c2718991587972ed
Author: Krishnan Sundaram <kr...@gmail.com>
AuthorDate: Wed Sep 30 07:16:16 2020 -0700
AVRO-2891: Expose last sync offset written on DataFileWriter (#954)
* AVRO-2891: Add code and tests for lastSync API
* AVRO-2891: simplify tests
Tests now only validate equivalence of sync markers between DataFileWriter and Reader. Remove additional test validating that a specific row can be read using the sync returned from DataFileWriter since it is somewhat redundant.
* AVRO-2891: revert change to cmakelists.txt
The original change was needed to cmakelists.txt due to linker errors
faced when using the standard build instructions.
* add another test to highlight efficient read of avro record using the
new API
* add few more comments to the new test
* Change API name as per PR feedback
Co-authored-by: Krishnan Sundaram <kr...@microsoft.com>
---
lang/c++/api/DataFile.hh | 12 ++
lang/c++/impl/DataFile.cc | 14 ++-
lang/c++/test/DataFileTests.cc | 274 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 298 insertions(+), 2 deletions(-)
diff --git a/lang/c++/api/DataFile.hh b/lang/c++/api/DataFile.hh
index 1b8f28b..a334cf5 100644
--- a/lang/c++/api/DataFile.hh
+++ b/lang/c++/api/DataFile.hh
@@ -73,6 +73,7 @@ class AVRO_DECL DataFileWriterBase : boost::noncopyable {
typedef std::map<std::string, std::vector<uint8_t> > Metadata;
Metadata metadata_;
+ int64_t lastSync_;
static std::unique_ptr<OutputStream> makeStream(const char* filename);
static DataFileSync makeSync();
@@ -103,6 +104,11 @@ public:
void syncIfNeeded();
/**
+ * Returns the byte offset (within the current file) of the start of the current block being written.
+ */
+ uint64_t getCurrentBlockStart();
+
+ /**
* Increments the object count.
*/
void incr() {
@@ -162,6 +168,12 @@ public:
}
/**
+ * Returns the byte offset (within the current file) of the start of the current block being written.
+ */
+ uint64_t getCurrentBlockStart() { return base_->getCurrentBlockStart(); }
+
+
+ /**
* Closes the current file. Once closed this datafile object cannot be
* used for writing any more.
*/
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index bd1606b..e20e605 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -73,7 +73,8 @@ DataFileWriterBase::DataFileWriterBase(const char* filename, const ValidSchema&
stream_(fileOutputStream(filename)),
buffer_(memoryOutputStream()),
sync_(makeSync()),
- objectCount_(0)
+ objectCount_(0),
+ lastSync_(0)
{
init(schema, syncInterval, codec);
}
@@ -88,7 +89,8 @@ DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStrea
stream_(std::move(outputStream)),
buffer_(memoryOutputStream()),
sync_(makeSync()),
- objectCount_(0)
+ objectCount_(0),
+ lastSync_(0)
{
init(schema, syncInterval, codec);
}
@@ -116,6 +118,8 @@ void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, co
writeHeader();
encoderPtr_->init(*buffer_);
+
+ lastSync_ = stream_->byteCount();
}
@@ -214,6 +218,7 @@ void DataFileWriterBase::sync()
avro::encode(*encoderPtr_, sync_);
encoderPtr_->flush();
+ lastSync_ = stream_->byteCount();
buffer_ = memoryOutputStream();
encoderPtr_->init(*buffer_);
@@ -228,6 +233,11 @@ void DataFileWriterBase::syncIfNeeded()
}
}
+uint64_t DataFileWriterBase::getCurrentBlockStart()
+{
+ return lastSync_;
+}
+
void DataFileWriterBase::flush()
{
sync();
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 4f94d80..8c4605f 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -162,6 +162,21 @@ static const char ischWithDoc[] =
"\"doc\": \"extra slashes\\\\\\\\\"}"
"]}";
+static const char schemaWithIdAndString[] = R"({
+ "type":"record",
+ "name":"R",
+ "fields":[
+ {
+ "name":"s1",
+ "type":"string"
+ },
+ {
+ "name":"id",
+ "type":"long"
+ }
+ ]
+ })";
+
string toString(const ValidSchema& s)
{
ostringstream oss;
@@ -760,6 +775,246 @@ void testSkipStringSnappyCodec()
}
#endif
+
+struct TestRecord {
+ std::string s1;
+ int64_t id;
+ TestRecord(const char* s1, int64_t id) : s1(s1), id(id) {}
+};
+
+namespace avro {
+ template<> struct codec_traits<TestRecord> {
+ static void encode(Encoder& e, const TestRecord& v) {
+ avro::encode(e, v.s1);
+ avro::encode(e, v.id);
+ }
+
+ static void decode(Decoder& d, TestRecord& v) {
+ avro::decode(d, v.s1);
+ avro::decode(d, v.id);
+ }
+ };
+} // namespace avro
+
+void testLastSync(avro::Codec codec) {
+
+ //
+ // This test does validates equivalence of the lastSync API on the writer and the previousSync() returned by the reader for every record read.
+ //
+
+ avro::ValidSchema writerSchema =
+ avro::compileJsonSchemaFromString(schemaWithIdAndString);
+
+ const size_t stringLen = 100;
+ char largeString[stringLen + 1];
+ for(size_t i = 0; i < stringLen; i++) {
+ largeString[i] = 'a';
+ }
+
+ largeString[stringLen] = '\0';
+
+ const char* filename = "test_lastSync.df";
+ std::deque<std::pair<uint64_t, int>> syncMetadata;
+ int numberOfRecords = 100;
+ {
+ avro::DataFileWriter<TestRecord> df(filename,
+ writerSchema, 1024, codec);
+
+ uint64_t lastSync = df.getCurrentBlockStart();
+ syncMetadata.push_back(std::pair<uint64_t, int>(lastSync, 0));
+ for(int i = 0; i < numberOfRecords; i++)
+ {
+ df.write(TestRecord(largeString, (int64_t)i));
+
+ // During the write, gather all the sync boundaries from the lastSync() API
+ if(df.getCurrentBlockStart() != lastSync)
+ {
+ int recordsUptoSync = i; // 1 less than total number of records written, since the sync block is sealed before a write
+ syncMetadata.push_back(std::pair<uint64_t, int>(lastSync, recordsUptoSync));
+ lastSync = df.getCurrentBlockStart();
+
+ //::printf("\nPast current block start %llu, total rows upto sync %d", lastSync, recordsUptoSync);
+ }
+ }
+
+ //::printf("\nPast current block start %llu, total rows upto sync %d", df.getCurrentBlockStart(), numberOfRecords);
+ syncMetadata.push_back(std::pair<uint64_t, int>(df.getCurrentBlockStart(), numberOfRecords));
+
+ df.flush();
+ df.close();
+ }
+
+ // Validate that the sync points returned by the writer using the lastSync API and the reader are the same for every record
+ {
+ avro::DataFileReader<TestRecord> df(filename);
+ TestRecord readRecord("", 0);
+
+ for(int index = 0; index < numberOfRecords; index++)
+ {
+ int rowsRead = index;
+ if(rowsRead > syncMetadata.front().second)
+ {
+ syncMetadata.pop_front();
+ }
+
+ BOOST_CHECK_EQUAL(df.previousSync(), syncMetadata.front().first);
+
+ BOOST_CHECK_EQUAL(df.read(readRecord), true);
+
+ int64_t expectedId = index;
+ BOOST_CHECK_EQUAL(expectedId, readRecord.id);
+ }
+
+ if(numberOfRecords > syncMetadata.front().second)
+ {
+ syncMetadata.pop_front();
+ }
+
+ // validate previousSync matches even at the end of the file
+ BOOST_CHECK_EQUAL(df.previousSync(), syncMetadata.front().first);
+ BOOST_CHECK_EQUAL(1, syncMetadata.size()); // only 1 item must be remaining in the syncMetadata queue
+ }
+}
+
+
+void testReadRecordEfficientlyUsingLastSync(avro::Codec codec) {
+
+ //
+ // This test highlights a motivating use case for the lastSync API on the DataFileWriter.
+ //
+
+ avro::ValidSchema writerSchema =
+ avro::compileJsonSchemaFromString(schemaWithIdAndString);
+
+ const size_t stringLen = 100;
+ char largeString[stringLen + 1];
+ for(size_t i = 0; i < stringLen; i++) {
+ largeString[i] = 'a';
+ }
+
+ largeString[stringLen] = '\0';
+
+ const char* filename = "test_readRecordUsingLastSync.df";
+
+ int numberOfRecords = 100;
+ int recordToRead = 37; // pick specific record to read efficiently
+ int syncPointWithRecord = 0;
+ int finalSync = 0;
+ int recordsUptoLastSync = 0;
+ int firstSyncPoint = 0;
+ {
+ avro::DataFileWriter<TestRecord> df(filename,
+ writerSchema, 1024, codec);
+
+ firstSyncPoint = df.getCurrentBlockStart();
+ syncPointWithRecord = firstSyncPoint;
+ for(int i = 0; i < numberOfRecords; i++)
+ {
+ df.write(TestRecord(largeString, (int64_t)i));
+
+ // During the write, gather all the sync boundaries from the lastSync() API
+ int recordsWritten = i + 1;
+ if((recordsWritten <= recordToRead) && (df.getCurrentBlockStart() != syncPointWithRecord))
+ {
+ recordsUptoLastSync = i; // 1 less than total number of records written, since the sync block is sealed before a write
+ syncPointWithRecord = df.getCurrentBlockStart();
+
+ //::printf("\nPast current block start %llu, total rows upto sync %d", syncPointWithRecord, recordsUptoLastSync);
+ }
+ }
+
+ finalSync = df.getCurrentBlockStart();
+ df.flush();
+ df.close();
+ }
+
+ // Validate that we're able to stitch together {header block | specific block with record} and read the specific record from the stitched block
+ {
+ std::unique_ptr<avro::SeekableInputStream> seekableInputStream = avro::fileSeekableInputStream(filename, 1000000);
+
+ const uint8_t* pData = nullptr;
+ size_t length = 0;
+ bool hasRead = seekableInputStream->next(&pData, &length);
+
+ // keep it simple, assume we've got in all data we want. We have a high buffersize to ensure this above.
+ BOOST_CHECK_GE(length, firstSyncPoint);
+ BOOST_CHECK_GE(length, finalSync);
+
+ std::vector<uint8_t> stitchedData;
+ // reserve space for header and data from specific block
+ stitchedData.reserve(firstSyncPoint + (finalSync - syncPointWithRecord));
+
+ // Copy header of the file
+ std::copy(pData, pData + firstSyncPoint, std::back_inserter(stitchedData));
+
+ // Copy data from the sync block containing the record of interest
+ std::copy(pData + syncPointWithRecord, pData + finalSync, std::back_inserter(stitchedData));
+
+ // Convert to inputStream
+ std::unique_ptr<avro::InputStream> inputStream = avro::memoryInputStream(stitchedData.data(), stitchedData.size());
+
+ int recordsUptoRecordToRead = recordToRead - recordsUptoLastSync;
+
+ // Ensure this is not the first record in the chunk.
+ BOOST_CHECK_GT(recordsUptoRecordToRead, 0);
+
+ avro::DataFileReader<TestRecord> df(std::move(inputStream));
+ TestRecord readRecord("", 0);
+ //::printf("\nReading %d rows until specific record is reached", recordsUptoRecordToRead);
+ for(int index = 0; index < recordsUptoRecordToRead; index++)
+ {
+ BOOST_CHECK_EQUAL(df.read(readRecord), true);
+
+ int64_t expectedId = (recordToRead - recordsUptoRecordToRead + index);
+ BOOST_CHECK_EQUAL(expectedId, readRecord.id);
+ }
+
+ // read specific record
+ BOOST_CHECK_EQUAL(df.read(readRecord), true);
+ BOOST_CHECK_EQUAL(recordToRead, readRecord.id);
+ }
+}
+
+void testLastSyncNullCodec()
+{
+ BOOST_TEST_CHECKPOINT(__func__);
+ testLastSync(avro::NULL_CODEC);
+}
+
+void testLastSyncDeflateCodec()
+{
+ BOOST_TEST_CHECKPOINT(__func__);
+ testLastSync(avro::DEFLATE_CODEC);
+}
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+void testLastSyncSnappyCodec()
+{
+ BOOST_TEST_CHECKPOINT(__func__);
+ testLastSync(avro::SNAPPY_CODEC);
+}
+#endif
+
+void testReadRecordEfficientlyUsingLastSyncNullCodec()
+{
+ BOOST_TEST_CHECKPOINT(__func__);
+ testReadRecordEfficientlyUsingLastSync(avro::NULL_CODEC);
+}
+
+void testReadRecordEfficientlyUsingLastSyncDeflateCodec()
+{
+ BOOST_TEST_CHECKPOINT(__func__);
+ testReadRecordEfficientlyUsingLastSync(avro::DEFLATE_CODEC);
+}
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+void testReadRecordEfficientlyUsingLastSyncSnappyCodec()
+{
+ BOOST_TEST_CHECKPOINT(__func__);
+ testReadRecordEfficientlyUsingLastSync(avro::SNAPPY_CODEC);
+}
+#endif
+
test_suite*
init_unit_test_suite(int argc, char *argv[])
{
@@ -883,6 +1138,7 @@ init_unit_test_suite(int argc, char *argv[])
ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
boost::unit_test::framework::master_test_suite().add(ts);
}
+
boost::unit_test::framework::master_test_suite().
add(BOOST_TEST_CASE(&testSkipStringNullCodec));
boost::unit_test::framework::master_test_suite().
@@ -892,5 +1148,23 @@ init_unit_test_suite(int argc, char *argv[])
add(BOOST_TEST_CASE(&testSkipStringSnappyCodec));
#endif
+ boost::unit_test::framework::master_test_suite().
+ add(BOOST_TEST_CASE(&testLastSyncNullCodec));
+ boost::unit_test::framework::master_test_suite().
+ add(BOOST_TEST_CASE(&testLastSyncDeflateCodec));
+#ifdef SNAPPY_CODEC_AVAILABLE
+ boost::unit_test::framework::master_test_suite().
+ add(BOOST_TEST_CASE(&testLastSyncSnappyCodec));
+#endif
+
+ boost::unit_test::framework::master_test_suite().
+ add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncNullCodec));
+ boost::unit_test::framework::master_test_suite().
+ add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncDeflateCodec));
+#ifdef SNAPPY_CODEC_AVAILABLE
+ boost::unit_test::framework::master_test_suite().
+ add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncSnappyCodec));
+#endif
+
return 0;
}