You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by rs...@apache.org on 2020/11/05 13:55:38 UTC

[avro] 05/05: AVRO-2891: Expose last sync offset written on DataFileWriter (#954)

This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 3961ea1a2a02816c6d175789c2718991587972ed
Author: Krishnan Sundaram <kr...@gmail.com>
AuthorDate: Wed Sep 30 07:16:16 2020 -0700

    AVRO-2891: Expose last sync offset written on DataFileWriter (#954)
    
    * AVRO-2891: Add code and tests for lastSync API
    
    * AVRO-2891: simplify tests
    
    Tests now only validate equivalence of sync markers between DataFileWriter and Reader. Remove additional test validating that a specific row can be read using the sync returned from DataFileWriter since it is somewhat redundant.
    
    * AVRO-2891: revert change to cmakelists.txt
    
    The original change was needed to cmakelists.txt due to linker errors
    faced when using the standard build instructions.
    
    * add another test to highlight efficient read of avro record using the
    new API
    
    * add few more comments to the new test
    
    * Change API name as per PR feedback
    
    Co-authored-by: Krishnan Sundaram <kr...@microsoft.com>
---
 lang/c++/api/DataFile.hh       |  12 ++
 lang/c++/impl/DataFile.cc      |  14 ++-
 lang/c++/test/DataFileTests.cc | 274 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 298 insertions(+), 2 deletions(-)

diff --git a/lang/c++/api/DataFile.hh b/lang/c++/api/DataFile.hh
index 1b8f28b..a334cf5 100644
--- a/lang/c++/api/DataFile.hh
+++ b/lang/c++/api/DataFile.hh
@@ -73,6 +73,7 @@ class AVRO_DECL DataFileWriterBase : boost::noncopyable {
     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
 
     Metadata metadata_;
+    int64_t lastSync_;
 
     static std::unique_ptr<OutputStream> makeStream(const char* filename);
     static DataFileSync makeSync();
@@ -103,6 +104,11 @@ public:
     void syncIfNeeded();
 
     /**
+     * Returns the byte offset (within the current file) of the start of the current block being written.
+     */
+    uint64_t getCurrentBlockStart();
+
+    /**
      * Increments the object count.
      */
     void incr() {
@@ -162,6 +168,12 @@ public:
     }
 
     /**
+     *  Returns the byte offset (within the current file) of the start of the current block being written.
+     */
+    uint64_t getCurrentBlockStart() { return base_->getCurrentBlockStart(); }
+
+
+    /**
      * Closes the current file. Once closed this datafile object cannot be
      * used for writing any more.
      */
diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc
index bd1606b..e20e605 100644
--- a/lang/c++/impl/DataFile.cc
+++ b/lang/c++/impl/DataFile.cc
@@ -73,7 +73,8 @@ DataFileWriterBase::DataFileWriterBase(const char* filename, const ValidSchema&
     stream_(fileOutputStream(filename)),
     buffer_(memoryOutputStream()),
     sync_(makeSync()),
-    objectCount_(0)
+    objectCount_(0),
+    lastSync_(0)
 {
     init(schema, syncInterval, codec);
 }
@@ -88,7 +89,8 @@ DataFileWriterBase::DataFileWriterBase(std::unique_ptr<OutputStream> outputStrea
     stream_(std::move(outputStream)),
     buffer_(memoryOutputStream()),
     sync_(makeSync()),
-    objectCount_(0)
+    objectCount_(0),
+    lastSync_(0)
 {
     init(schema, syncInterval, codec);
 }
@@ -116,6 +118,8 @@ void DataFileWriterBase::init(const ValidSchema &schema, size_t syncInterval, co
 
     writeHeader();
     encoderPtr_->init(*buffer_);
+
+    lastSync_ = stream_->byteCount();
 }
 
 
@@ -214,6 +218,7 @@ void DataFileWriterBase::sync()
     avro::encode(*encoderPtr_, sync_);
     encoderPtr_->flush();
 
+    lastSync_ = stream_->byteCount();
 
     buffer_ = memoryOutputStream();
     encoderPtr_->init(*buffer_);
@@ -228,6 +233,11 @@ void DataFileWriterBase::syncIfNeeded()
     }
 }
 
+uint64_t DataFileWriterBase::getCurrentBlockStart()
+{
+    return lastSync_;
+}
+
 void DataFileWriterBase::flush()
 {
     sync();
diff --git a/lang/c++/test/DataFileTests.cc b/lang/c++/test/DataFileTests.cc
index 4f94d80..8c4605f 100644
--- a/lang/c++/test/DataFileTests.cc
+++ b/lang/c++/test/DataFileTests.cc
@@ -162,6 +162,21 @@ static const char ischWithDoc[] =
         "\"doc\": \"extra slashes\\\\\\\\\"}"
     "]}";
 
+static const char schemaWithIdAndString[] = R"({
+       "type":"record",
+       "name":"R",
+       "fields":[
+          {
+             "name":"s1",
+             "type":"string"
+          },
+          {
+             "name":"id",
+             "type":"long"
+          }
+       ]
+    })";
+
 string toString(const ValidSchema& s)
 {
     ostringstream oss;
@@ -760,6 +775,246 @@ void testSkipStringSnappyCodec()
 }
 #endif
 
+
+struct TestRecord {
+    std::string s1;
+    int64_t id;
+    TestRecord(const char* s1, int64_t id) : s1(s1), id(id) {}
+};
+
+namespace avro {
+    template<> struct codec_traits<TestRecord> {
+        static void encode(Encoder& e, const TestRecord& v) {
+            avro::encode(e, v.s1);
+            avro::encode(e, v.id);
+        }
+
+        static void decode(Decoder& d, TestRecord& v) {
+            avro::decode(d, v.s1);
+            avro::decode(d, v.id);
+        }
+    };
+}   // namespace avro
+
+void testLastSync(avro::Codec codec) {
+
+    //
+    // This test does validates equivalence of the lastSync API on the writer and the previousSync() returned by the reader for every record read.
+    //
+
+    avro::ValidSchema writerSchema =
+        avro::compileJsonSchemaFromString(schemaWithIdAndString);
+    
+    const size_t stringLen = 100;
+    char largeString[stringLen + 1];
+    for(size_t i = 0; i < stringLen; i++) {
+        largeString[i] = 'a';
+    }
+
+    largeString[stringLen] = '\0';
+
+    const char* filename = "test_lastSync.df";
+    std::deque<std::pair<uint64_t, int>> syncMetadata;
+    int numberOfRecords = 100;
+    {
+        avro::DataFileWriter<TestRecord> df(filename,
+            writerSchema, 1024, codec);
+
+        uint64_t lastSync = df.getCurrentBlockStart();
+        syncMetadata.push_back(std::pair<uint64_t, int>(lastSync, 0));
+        for(int i = 0; i < numberOfRecords; i++)
+        {
+            df.write(TestRecord(largeString, (int64_t)i));
+            
+            // During the write, gather all the sync boundaries from the lastSync() API
+            if(df.getCurrentBlockStart() != lastSync)
+            {
+                int recordsUptoSync = i;    // 1 less than total number of records written, since the sync block is sealed before a write
+                syncMetadata.push_back(std::pair<uint64_t, int>(lastSync, recordsUptoSync));
+                lastSync = df.getCurrentBlockStart();
+
+                //::printf("\nPast current block start %llu, total rows upto sync %d", lastSync, recordsUptoSync);
+            }
+        }
+
+        //::printf("\nPast current block start %llu, total rows upto sync %d", df.getCurrentBlockStart(), numberOfRecords);
+        syncMetadata.push_back(std::pair<uint64_t, int>(df.getCurrentBlockStart(), numberOfRecords));
+
+        df.flush();
+        df.close();
+    }
+
+    // Validate that the sync points returned by the writer using the lastSync API and the reader are the same for every record
+    {
+        avro::DataFileReader<TestRecord> df(filename);
+        TestRecord readRecord("", 0);
+
+        for(int index = 0; index < numberOfRecords; index++)
+        {
+            int rowsRead = index;
+            if(rowsRead > syncMetadata.front().second)
+            {
+                syncMetadata.pop_front();
+            }
+
+            BOOST_CHECK_EQUAL(df.previousSync(), syncMetadata.front().first);
+
+            BOOST_CHECK_EQUAL(df.read(readRecord), true);
+
+            int64_t expectedId = index;
+            BOOST_CHECK_EQUAL(expectedId, readRecord.id);
+        }
+
+        if(numberOfRecords > syncMetadata.front().second)
+        {
+            syncMetadata.pop_front();
+        }
+
+        // validate previousSync matches even at the end of the file
+        BOOST_CHECK_EQUAL(df.previousSync(), syncMetadata.front().first);
+        BOOST_CHECK_EQUAL(1, syncMetadata.size());  // only 1 item must be remaining in the syncMetadata queue
+    }
+}
+
+
+void testReadRecordEfficientlyUsingLastSync(avro::Codec codec) {
+
+    //
+    // This test highlights a motivating use case for the lastSync API on the DataFileWriter.
+    //
+
+    avro::ValidSchema writerSchema =
+        avro::compileJsonSchemaFromString(schemaWithIdAndString);
+
+    const size_t stringLen = 100;
+    char largeString[stringLen + 1];
+    for(size_t i = 0; i < stringLen; i++) {
+        largeString[i] = 'a';
+    }
+
+    largeString[stringLen] = '\0';
+
+    const char* filename = "test_readRecordUsingLastSync.df";
+    
+    int numberOfRecords = 100;
+    int recordToRead = 37;  // pick specific record to read efficiently
+    int syncPointWithRecord = 0;
+    int finalSync = 0;
+    int recordsUptoLastSync = 0;
+    int firstSyncPoint = 0;
+    {
+        avro::DataFileWriter<TestRecord> df(filename,
+            writerSchema, 1024, codec);
+
+        firstSyncPoint = df.getCurrentBlockStart();
+        syncPointWithRecord = firstSyncPoint;
+        for(int i = 0; i < numberOfRecords; i++)
+        {
+            df.write(TestRecord(largeString, (int64_t)i));
+
+            // During the write, gather all the sync boundaries from the lastSync() API
+            int recordsWritten = i + 1;
+            if((recordsWritten <= recordToRead) && (df.getCurrentBlockStart() != syncPointWithRecord))
+            {
+                recordsUptoLastSync = i;    // 1 less than total number of records written, since the sync block is sealed before a write
+                syncPointWithRecord = df.getCurrentBlockStart();
+
+                //::printf("\nPast current block start %llu, total rows upto sync %d", syncPointWithRecord, recordsUptoLastSync);
+            }
+        }
+
+        finalSync = df.getCurrentBlockStart();
+        df.flush();
+        df.close();
+    }
+
+    // Validate that we're able to stitch together {header block | specific block with record} and read the specific record from the stitched block
+    {
+        std::unique_ptr<avro::SeekableInputStream> seekableInputStream = avro::fileSeekableInputStream(filename, 1000000);
+
+        const uint8_t* pData = nullptr;
+        size_t length = 0;
+        bool hasRead = seekableInputStream->next(&pData, &length);
+
+        // keep it simple, assume we've got in all data we want. We have a high buffersize to ensure this above.
+        BOOST_CHECK_GE(length, firstSyncPoint);
+        BOOST_CHECK_GE(length, finalSync);
+
+        std::vector<uint8_t> stitchedData;
+        // reserve space for header and data from specific block
+        stitchedData.reserve(firstSyncPoint + (finalSync - syncPointWithRecord));
+
+        // Copy header of the file
+        std::copy(pData, pData + firstSyncPoint, std::back_inserter(stitchedData));
+
+        // Copy data from the sync block containing the record of interest
+        std::copy(pData + syncPointWithRecord, pData + finalSync, std::back_inserter(stitchedData));
+
+        // Convert to inputStream
+        std::unique_ptr<avro::InputStream> inputStream = avro::memoryInputStream(stitchedData.data(), stitchedData.size());
+
+        int recordsUptoRecordToRead = recordToRead - recordsUptoLastSync;
+
+        // Ensure this is not the first record in the chunk.
+        BOOST_CHECK_GT(recordsUptoRecordToRead, 0);
+
+        avro::DataFileReader<TestRecord> df(std::move(inputStream));
+        TestRecord readRecord("", 0);
+        //::printf("\nReading %d rows until specific record is reached", recordsUptoRecordToRead);
+        for(int index = 0; index < recordsUptoRecordToRead; index++)
+        {
+            BOOST_CHECK_EQUAL(df.read(readRecord), true);
+
+            int64_t expectedId = (recordToRead - recordsUptoRecordToRead + index);
+            BOOST_CHECK_EQUAL(expectedId, readRecord.id);
+        }
+
+        // read specific record
+        BOOST_CHECK_EQUAL(df.read(readRecord), true);
+        BOOST_CHECK_EQUAL(recordToRead, readRecord.id);
+    }
+}
+
+void testLastSyncNullCodec()
+{
+    BOOST_TEST_CHECKPOINT(__func__);
+    testLastSync(avro::NULL_CODEC);
+}
+
+void testLastSyncDeflateCodec()
+{
+    BOOST_TEST_CHECKPOINT(__func__);
+    testLastSync(avro::DEFLATE_CODEC);
+}
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+void testLastSyncSnappyCodec()
+{
+    BOOST_TEST_CHECKPOINT(__func__);
+    testLastSync(avro::SNAPPY_CODEC);
+}
+#endif
+
+void testReadRecordEfficientlyUsingLastSyncNullCodec()
+{
+    BOOST_TEST_CHECKPOINT(__func__);
+    testReadRecordEfficientlyUsingLastSync(avro::NULL_CODEC);
+}
+
+void testReadRecordEfficientlyUsingLastSyncDeflateCodec()
+{
+    BOOST_TEST_CHECKPOINT(__func__);
+    testReadRecordEfficientlyUsingLastSync(avro::DEFLATE_CODEC);
+}
+
+#ifdef SNAPPY_CODEC_AVAILABLE
+void testReadRecordEfficientlyUsingLastSyncSnappyCodec()
+{
+    BOOST_TEST_CHECKPOINT(__func__);
+    testReadRecordEfficientlyUsingLastSync(avro::SNAPPY_CODEC);
+}
+#endif
+
 test_suite*
 init_unit_test_suite(int argc, char *argv[])
 {
@@ -883,6 +1138,7 @@ init_unit_test_suite(int argc, char *argv[])
         ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
         boost::unit_test::framework::master_test_suite().add(ts);
     }
+
     boost::unit_test::framework::master_test_suite().
         add(BOOST_TEST_CASE(&testSkipStringNullCodec));
     boost::unit_test::framework::master_test_suite().
@@ -892,5 +1148,23 @@ init_unit_test_suite(int argc, char *argv[])
         add(BOOST_TEST_CASE(&testSkipStringSnappyCodec));
 #endif
 
+    boost::unit_test::framework::master_test_suite().
+        add(BOOST_TEST_CASE(&testLastSyncNullCodec));
+    boost::unit_test::framework::master_test_suite().
+        add(BOOST_TEST_CASE(&testLastSyncDeflateCodec));
+#ifdef SNAPPY_CODEC_AVAILABLE
+    boost::unit_test::framework::master_test_suite().
+        add(BOOST_TEST_CASE(&testLastSyncSnappyCodec));
+#endif
+
+    boost::unit_test::framework::master_test_suite().
+        add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncNullCodec));
+    boost::unit_test::framework::master_test_suite().
+        add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncDeflateCodec));
+#ifdef SNAPPY_CODEC_AVAILABLE
+    boost::unit_test::framework::master_test_suite().
+        add(BOOST_TEST_CASE(&testReadRecordEfficientlyUsingLastSyncSnappyCodec));
+#endif
+
     return 0;
 }