You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by th...@apache.org on 2011/03/30 09:40:54 UTC

svn commit: r1086866 - in /avro/trunk: ./ lang/c++/ lang/c++/api/ lang/c++/impl/ lang/c++/impl/parsing/ lang/c++/test/

Author: thiru
Date: Wed Mar 30 07:40:53 2011
New Revision: 1086866

URL: http://svn.apache.org/viewvc?rev=1086866&view=rev
Log:
AVRO-789. Datafile support in C++

Added:
    avro/trunk/lang/c++/api/DataFile.hh
    avro/trunk/lang/c++/impl/DataFile.cc
    avro/trunk/lang/c++/test/DataFileTests.cc
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/c++/CMakeLists.txt
    avro/trunk/lang/c++/api/Stream.hh
    avro/trunk/lang/c++/build.sh
    avro/trunk/lang/c++/impl/FileStream.cc
    avro/trunk/lang/c++/impl/parsing/ResolvingDecoder.cc
    avro/trunk/lang/c++/impl/parsing/Symbol.hh

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Mar 30 07:40:53 2011
@@ -248,6 +248,8 @@ Avro 1.5.0 (10 March 2011)
 
     AVRO-783. Specifc object support in C++. (thiru)
 
+    AVRO-789. Datafile support in C++. (thiru)
+
   BUG FIXES
 
     AVRO-764. Java: Bug in BinaryData.compare() with offset comparison.

Modified: avro/trunk/lang/c++/CMakeLists.txt
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/CMakeLists.txt?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/CMakeLists.txt (original)
+++ avro/trunk/lang/c++/CMakeLists.txt Wed Mar 30 07:40:53 2011
@@ -45,6 +45,7 @@ add_library (avrocpp SHARED
         impl/BinaryEncoder.cc impl/BinaryDecoder.cc
         impl/Stream.cc impl/FileStream.cc
         impl/Generic.cc
+        impl/DataFile.cc
         impl/parsing/Symbol.cc
         impl/parsing/ValidatingCodec.cc
         impl/parsing/JsonCodec.cc
@@ -136,6 +137,9 @@ add_dependencies (AvrogencppTests bigrec
     union_array_union_hh union_map_union_hh)
 target_link_libraries (AvrogencppTests avrocpp ${BOOST_LIBRARIES})
 
+add_executable (DataFileTests test/DataFileTests.cc)
+target_link_libraries (DataFileTests avrocpp ${Boost_LIBRARIES})
+
 include (InstallRequiredSystemLibraries)
 
 set (CPACK_PACKAGE_FILE_NAME "avrocpp-${AVRO_VERSION_MAJOR}")

Added: avro/trunk/lang/c++/api/DataFile.hh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/api/DataFile.hh?rev=1086866&view=auto
==============================================================================
--- avro/trunk/lang/c++/api/DataFile.hh (added)
+++ avro/trunk/lang/c++/api/DataFile.hh Wed Mar 30 07:40:53 2011
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef avro_DataFile_hh__
+#define avro_DataFile_hh__
+
+#include "Encoder.hh"
+#include "buffer/Buffer.hh"
+#include "ValidSchema.hh"
+#include "Specific.hh"
+#include "Stream.hh"
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "boost/array.hpp"
+#include "boost/utility.hpp"
+
+namespace avro {
+
+typedef boost::array<uint8_t, 16> DataFileSync;
+
+/**
+ * Type-independent portion of DataFileWriter.
+ *  At any given point in time, at most one file can be written using
+ *  this object.
+ */
+class DataFileWriterBase : boost::noncopyable {
+    const std::string filename_;
+    const ValidSchema schema_;
+    const EncoderPtr encoderPtr_;
+    const size_t syncInterval_;
+
+    std::auto_ptr<OutputStream> stream_;
+    std::auto_ptr<OutputStream> buffer_;
+    const DataFileSync sync_;
+    int64_t objectCount_;
+
+    typedef std::map<std::string, std::vector<uint8_t> > Metadata;
+
+    Metadata metadata_;
+
+    static std::auto_ptr<OutputStream> makeStream(const char* filename);
+    static DataFileSync makeSync();
+
+    void writeHeader();
+    void setMetadata(const std::string& key, const std::string& value);
+
+    /**
+     * Generates a sync marker in the file.
+     */
+    void sync();
+
+protected:
+    Encoder& encoder() const { return *encoderPtr_; }
+    
+    void syncIfNeeded();
+
+    void incr() {
+        ++objectCount_;
+    }
+public:
+    /**
+     * Constructs a data file writer with the given sync interval and name.
+     */
+    DataFileWriterBase(const char* filename, const ValidSchema& schema,
+        size_t syncInterval);
+
+    ~DataFileWriterBase();
+    /**
+     * Closes the current file. Once closed this datafile object cannot be
+     * used for writing any more.
+     */
+    void close();
+
+    /**
+     * Returns the schema for this data file.
+     */
+    const ValidSchema& schema() const { return schema_; }
+
+    /**
+     * Flushes any unwritten data into the file.
+     */
+    void flush();
+};
+
+/**
+ *  An Avro datafile that can store objects of type T.
+ */
+template <typename T>
+class DataFileWriter : public DataFileWriterBase {
+public:
+    /**
+     * Constructs a new data file.
+     */
+    DataFileWriter(const char* filename, const ValidSchema& schema,
+        size_t syncInterval = 16 * 1024) :
+        DataFileWriterBase(filename, schema, syncInterval) { }
+
+    /**
+     * Writes the given piece of data into the file.
+     */
+    void write(const T& datum) {
+        syncIfNeeded();
+        avro::encode(encoder(), datum);
+        incr();
+    }
+};
+
+class DataFileReaderBase : boost::noncopyable {
+    const std::string filename_;
+    const std::auto_ptr<InputStream> stream_;
+    const DecoderPtr decoder_;
+    int64_t objectCount_;
+
+    ValidSchema readerSchema_;
+    ValidSchema dataSchema_;
+    DecoderPtr dataDecoder_;
+    std::auto_ptr<InputStream> dataStream_;
+    typedef std::map<std::string, std::vector<uint8_t> > Metadata;
+
+    Metadata metadata_;
+    DataFileSync sync_;
+
+    void readHeader();
+
+protected:
+    Decoder& decoder() { return *dataDecoder_; }
+
+    /**
+     * Returns true if and only if there is more to read.
+     */
+    bool hasMore();
+
+    void decr() { --objectCount_; }
+    bool readDataBlock();
+
+public:
+    /**
+     * Constructs the reader for the given file and the reader is
+     * expected to use the given schema.
+     */
+    DataFileReaderBase(const char* filename, const ValidSchema& readerSchema);
+
+    /**
+     * Constructs the reader for the given file and the reader is
+     * expected to use the schema that is used with data.
+     */
+    DataFileReaderBase(const char* filename);
+
+    /**
+     * Returns the schema for this object.
+     */
+    const ValidSchema& readerSchema() { return readerSchema_; }
+
+    /**
+     * Returns the schema stored with the data file.
+     */
+    const ValidSchema& dataSchema() { return dataSchema_; }
+
+    /**
+     * Closes the reader. No further operation is possible on this reader.
+     */
+    void close();
+};
+
+template <typename T>
+class DataFileReader : public DataFileReaderBase {
+public:
+    /**
+     * Constructs the reader for the given file and the reader is
+     * expected to use the given schema.
+     */
+    DataFileReader(const char* filename, const ValidSchema& readerSchema) :
+        DataFileReaderBase(filename, readerSchema) { }
+
+    /**
+     * Constructs the reader for the given file and the reader is
+     * expected to use the schema that is used with data.
+     */
+    DataFileReader(const char* filename) : DataFileReaderBase(filename) { }
+
+    bool read(T& datum) {
+        if (hasMore()) {
+            decr();
+            avro::decode(decoder(), datum);
+            return true;
+        }
+        return false;
+    }
+};
+
+}   // namespace avro
+#endif
+
+

Modified: avro/trunk/lang/c++/api/Stream.hh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/api/Stream.hh?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/api/Stream.hh (original)
+++ avro/trunk/lang/c++/api/Stream.hh Wed Mar 30 07:40:53 2011
@@ -136,6 +136,7 @@ struct StreamReader {
     const uint8_t* end_;
 
     StreamReader() : in_(0), next_(0), end_(0) { }
+    StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
 
     void reset(InputStream& is) {
         if (in_ != 0) {
@@ -209,6 +210,7 @@ struct StreamWriter {
     uint8_t* end_;
 
     StreamWriter() : out_(0), next_(0), end_(0) { }
+    StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
 
     void reset(OutputStream& os) {
         if (out_ != 0) {
@@ -260,6 +262,22 @@ struct StreamWriter {
         out_->flush();
     }
 };
+
+/**
+ * A convenience function to copy all the contents of an input stream into
+ * an output stream.
+ */
+inline void copy(InputStream& in, OutputStream& out)
+{
+    const uint8_t *p = 0;
+    size_t n = 0;
+    StreamWriter w(out);
+    while (in.next(&p, &n)) {
+        w.writeBytes(p, n);
+    }
+    w.flush();
+}
+
 }   // namespace avro
 #endif
 

Modified: avro/trunk/lang/c++/build.sh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/build.sh?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/build.sh (original)
+++ avro/trunk/lang/c++/build.sh Wed Mar 30 07:40:53 2011
@@ -84,6 +84,7 @@ case "$target" in
     ./build/StreamTests
     ./build/SpecificTests
     ./build/AvrogencppTests
+    ./build/DataFileTests
 	;;
 
     dist)

Added: avro/trunk/lang/c++/impl/DataFile.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/DataFile.cc?rev=1086866&view=auto
==============================================================================
--- avro/trunk/lang/c++/impl/DataFile.cc (added)
+++ avro/trunk/lang/c++/impl/DataFile.cc Wed Mar 30 07:40:53 2011
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DataFile.hh"
+#include "Compiler.hh"
+#include "Exception.hh"
+
+#include <sstream>
+
+#include <boost/random/mersenne_twister.hpp>
+
+namespace avro {
+using std::auto_ptr;
+using std::ostringstream;
+using std::istringstream;
+using std::vector;
+using std::copy;
+using std::string;
+
+using boost::array;
+
+const string AVRO_SCHEMA_KEY("avro.schema");
+const string AVRO_CODEC_KEY("avro.codec");
+const string AVRO_NULL_CODEC("null");
+
+const size_t minSyncInterval = 32;
+const size_t maxSyncInterval = 1u << 30;
+const size_t defaultSyncInterval = 16 * 1024;
+
+static string toString(const ValidSchema& schema)
+{
+    ostringstream oss;
+    schema.toJson(oss);
+    return oss.str();
+}
+
+DataFileWriterBase::DataFileWriterBase(const char* filename,
+    const ValidSchema& schema, size_t syncInterval) :
+    filename_(filename), schema_(schema), encoderPtr_(binaryEncoder()),
+    syncInterval_(syncInterval),
+    stream_(fileOutputStream(filename)),
+    buffer_(memoryOutputStream()),
+    sync_(makeSync()), objectCount_(0)
+{
+    if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) {
+        throw Exception(boost::format("Invalid sync interval: %1%. "
+            "Should be between %2% and %3%") % syncInterval %
+            minSyncInterval % maxSyncInterval);
+    }
+    setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
+
+    setMetadata(AVRO_SCHEMA_KEY, toString(schema));
+
+    writeHeader();
+    encoderPtr_->init(*buffer_);
+}
+
+DataFileWriterBase::~DataFileWriterBase()
+{
+    if (stream_.get()) {
+        close();
+    }
+}
+
+void DataFileWriterBase::close()
+{
+    flush();
+    stream_.reset();
+}
+
+void DataFileWriterBase::sync()
+{
+    encoderPtr_->flush();
+
+    encoderPtr_->init(*stream_);
+    avro::encode(*encoderPtr_, objectCount_);
+    int64_t byteCount = buffer_->byteCount();
+    avro::encode(*encoderPtr_, byteCount);
+    encoderPtr_->flush();
+
+    auto_ptr<InputStream> in = memoryInputStream(*buffer_);
+    copy(*in, *stream_);
+
+    encoderPtr_->init(*stream_);
+    avro::encode(*encoderPtr_, sync_);
+    encoderPtr_->flush();
+
+
+    buffer_ = memoryOutputStream();
+    encoderPtr_->init(*buffer_);
+    objectCount_ = 0;
+}
+
+void DataFileWriterBase::syncIfNeeded()
+{
+    encoderPtr_->flush();
+    if (buffer_->byteCount() >= syncInterval_) {
+        sync();
+    }
+}
+
+void DataFileWriterBase::flush()
+{
+    sync();
+}
+
+boost::mt19937 random(time(0));
+
+DataFileSync DataFileWriterBase::makeSync()
+{
+    DataFileSync sync;
+    for (size_t i = 0; i < sync.size(); ++i) {
+        sync[i] = random();
+    }
+    return sync;
+}
+
+typedef array<uint8_t, 4> Magic;
+static Magic magic = { 'O', 'b', 'j', '\x01' };
+
+void DataFileWriterBase::writeHeader()
+{
+    encoderPtr_->init(*stream_);
+    avro::encode(*encoderPtr_, magic);
+    avro::encode(*encoderPtr_, metadata_);
+    avro::encode(*encoderPtr_, sync_);
+    encoderPtr_->flush();
+}
+
+void DataFileWriterBase::setMetadata(const string& key, const string& value)
+{
+    vector<uint8_t> v(value.size());
+    copy(value.begin(), value.end(), v.begin());
+    metadata_[key] = v;
+}
+
+DataFileReaderBase::DataFileReaderBase(const char* filename,
+    const ValidSchema& schema) :
+    filename_(filename), stream_(fileInputStream(filename)),
+    decoder_(binaryDecoder()), objectCount_(0), readerSchema_(schema)
+{
+    readHeader();
+}
+
+DataFileReaderBase::DataFileReaderBase(const char* filename) :
+    filename_(filename), stream_(fileInputStream(filename)),
+    decoder_(binaryDecoder()), objectCount_(0)
+{
+    readHeader();
+}
+
+static void drain(InputStream& in)
+{
+    const uint8_t *p = 0;
+    size_t n = 0;
+    while (in.next(&p, &n));
+}
+
+char hex(unsigned int x)
+{
+    return x + (x < 10 ? '0' :  ('a' - 10));
+}
+
+std::ostream& operator << (std::ostream& os, const DataFileSync& s)
+{
+    for (size_t i = 0; i < s.size(); ++i) {
+        os << hex(s[i] / 16)  << hex(s[i] % 16) << ' ';
+    }
+    os << std::endl;
+    return os;
+}
+
+
+bool DataFileReaderBase::hasMore()
+{
+    if (objectCount_ != 0) {
+        return true;
+    }
+    dataDecoder_->init(*dataStream_);
+    drain(*dataStream_);
+    DataFileSync s;
+    decoder_->init(*stream_);
+    avro::decode(*decoder_, s);
+    if (s != sync_) {
+        throw Exception("Sync mismatch");
+    }
+    return readDataBlock();
+}
+
+class BoundedInputStream : public InputStream {
+    InputStream& in_;
+    size_t limit_;
+
+    bool next(const uint8_t** data, size_t* len) {
+        if (limit_ != 0 && in_.next(data, len)) {
+            if (*len > limit_) {
+                in_.backup(*len - limit_);
+                *len = limit_;
+            }
+            limit_ -= *len;
+            return true;
+        }
+        return false;
+    }
+
+    void backup(size_t len) {
+        in_.backup(len);
+        limit_ += len;
+    }
+
+    void skip(size_t len) {
+        if (len > limit_) {
+            len = limit_;
+        }
+        in_.skip(len);
+        limit_ -= len;
+    }
+
+    size_t byteCount() const {
+        return in_.byteCount();
+    }
+
+public:
+    BoundedInputStream(InputStream& in, size_t limit) :
+        in_(in), limit_(limit) { }
+};
+
+auto_ptr<InputStream> boundedInputStream(InputStream& in, size_t limit)
+{
+    return auto_ptr<InputStream>(new BoundedInputStream(in, limit));
+}
+
+bool DataFileReaderBase::readDataBlock()
+{
+    decoder_->init(*stream_);
+    const uint8_t* p = 0;
+    size_t n = 0;
+    if (! stream_->next(&p, &n)) {
+        return false;
+    }
+    stream_->backup(n);
+    avro::decode(*decoder_, objectCount_);
+    int64_t byteCount;
+    avro::decode(*decoder_, byteCount);
+    decoder_->init(*stream_);
+
+    auto_ptr<InputStream> st = boundedInputStream(*stream_, byteCount);
+    dataDecoder_->init(*st);
+    dataStream_ = st;
+    return true;
+}
+
+void DataFileReaderBase::close()
+{
+}
+
+static string toString(const vector<uint8_t>& v)
+{
+    string result;
+    result.resize(v.size());
+    copy(v.begin(), v.end(), result.begin());
+    return result;
+}
+
+static ValidSchema makeSchema(const vector<uint8_t>& v)
+{
+    istringstream iss(toString(v));
+    ValidSchema vs;
+    compileJsonSchema(iss, vs);
+    return ValidSchema(vs);
+}
+
+void DataFileReaderBase::readHeader()
+{
+    decoder_->init(*stream_);
+    Magic m;
+    avro::decode(*decoder_, m);
+    if (magic != m) {
+        throw Exception("Invalid data file. Magic does not match: "
+            + filename_);
+    }
+    avro::decode(*decoder_, metadata_);
+    Metadata::const_iterator it = metadata_.find(AVRO_SCHEMA_KEY);
+    if (it == metadata_.end()) {
+        throw Exception("No schema in metadata");
+    }
+
+    dataSchema_ = makeSchema(it->second);
+    if (! readerSchema_.root()) {
+        readerSchema_ = dataSchema();
+    }
+
+    it = metadata_.find(AVRO_CODEC_KEY);
+    if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
+        throw Exception("Unknown codec in data file: " + toString(it->second));
+    }
+
+    dataDecoder_  = (toString(readerSchema_) != toString(dataSchema_)) ?
+        resolvingDecoder(dataSchema_, readerSchema_, binaryDecoder()) :
+        binaryDecoder();
+        
+    avro::decode(*decoder_, sync_);
+    readDataBlock();
+}
+
+}   // namespace avro

Modified: avro/trunk/lang/c++/impl/FileStream.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/FileStream.cc?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/impl/FileStream.cc (original)
+++ avro/trunk/lang/c++/impl/FileStream.cc Wed Mar 30 07:40:53 2011
@@ -113,16 +113,17 @@ class FileOutputStream : public OutputSt
     size_t available_;
     size_t byteCount_;
 
+    // Invaiant: byteCount_ == byteswritten + bufferSize_ - available_;
     bool next(uint8_t** data, size_t* len) {
         if (available_ == 0) {
             flush();
         }
         *data = next_;
         *len = available_;
-        byteCount_ += available_;
         next_ += available_;
         byteCount_ += available_;
         available_ = 0;
+        return true;
     }
 
     void backup(size_t len) {
@@ -150,7 +151,7 @@ public:
         buffer_(new uint8_t[bufferSize]),
         out_(::open(filename, O_WRONLY | O_CREAT | O_BINARY, 0644)),
         next_(buffer_),
-        available_(bufferSize_) { }
+        available_(bufferSize_), byteCount_(0) { }
 
     ~FileOutputStream() {
         if (out_ >= 0) {

Modified: avro/trunk/lang/c++/impl/parsing/ResolvingDecoder.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/parsing/ResolvingDecoder.cc?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/impl/parsing/ResolvingDecoder.cc (original)
+++ avro/trunk/lang/c++/impl/parsing/ResolvingDecoder.cc Wed Mar 30 07:40:53 2011
@@ -450,6 +450,7 @@ template <typename P>
 void ResolvingDecoderImpl<P>::init(InputStream& is)
 {
     base_->init(is);
+    parser_.reset();
 }
 
 template <typename P>

Modified: avro/trunk/lang/c++/impl/parsing/Symbol.hh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/parsing/Symbol.hh?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/impl/parsing/Symbol.hh (original)
+++ avro/trunk/lang/c++/impl/parsing/Symbol.hh Wed Mar 30 07:40:53 2011
@@ -693,6 +693,12 @@ public:
         parsingStack.push(s);
     }
 
+    void reset() {
+        while (parsingStack.size() > 1) {
+            parsingStack.pop();
+        }
+    }
+
 };
 
 }   // namespace parsing

Added: avro/trunk/lang/c++/test/DataFileTests.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/test/DataFileTests.cc?rev=1086866&view=auto
==============================================================================
--- avro/trunk/lang/c++/test/DataFileTests.cc (added)
+++ avro/trunk/lang/c++/test/DataFileTests.cc Wed Mar 30 07:40:53 2011
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/test/included/unit_test_framework.hpp>
+#include <boost/test/unit_test.hpp>
+#include <boost/filesystem.hpp>
+
+#include <sstream>
+
+#include "DataFile.hh"
+#include "Generic.hh"
+#include "Stream.hh"
+#include "Compiler.hh"
+
+using std::auto_ptr;
+using std::string;
+using std::pair;
+using std::vector;
+using std::map;
+using std::istringstream;
+
+using boost::array;
+using boost::shared_ptr;
+using boost::unit_test::test_suite;
+
+
+using avro::ValidSchema;
+using avro::GenericDatum;
+using avro::GenericRecord;
+
+template <typename T>
+struct Complex {
+    T re;
+    T im;
+    Complex() : re(0), im(0) { }
+    Complex(T r, T i) : re(r), im(i) { }
+};
+
+struct Integer {
+    int64_t re;
+    Integer() : re(0) { }
+    Integer(int64_t r) : re(r) { }
+
+    bool operator==(const Integer& oth) const {
+        return re == oth.re;
+    }
+};
+
+typedef Complex<int64_t> ComplexInteger;
+typedef Complex<double> ComplexDouble;
+
+namespace avro {
+
+template <typename T> struct codec_traits<Complex<T> > {
+    static void encode(Encoder& e, const Complex<T>& c) {
+        avro::encode(e, c.re);
+        avro::encode(e, c.im);
+    }
+    
+    static void decode(Decoder& d, Complex<T>& c) {
+        avro::decode(d, c.re);
+        avro::decode(d, c.im);
+    }
+};
+
+template <> struct codec_traits<Integer> {
+    static void decode(Decoder& d, Integer& c) {
+        avro::decode(d, c.re);
+    }
+};
+
+}
+
+static ValidSchema makeValidSchema(const char* schema)
+{
+    istringstream iss(schema);
+    ValidSchema vs;
+    compileJsonSchema(iss, vs);
+    return ValidSchema(vs);
+}
+
+static const char sch[] = "{\"type\": \"record\","
+    "\"name\":\"ComplexInteger\", \"fields\": ["
+        "{\"name\":\"re\", \"type\":\"long\"},"
+        "{\"name\":\"im\", \"type\":\"long\"}"
+    "]}";
+static const char isch[] = "{\"type\": \"record\","
+    "\"name\":\"ComplexInteger\", \"fields\": ["
+        "{\"name\":\"re\", \"type\":\"long\"}"
+    "]}";
+static const char dsch[] = "{\"type\": \"record\","
+    "\"name\":\"ComplexDouble\", \"fields\": ["
+        "{\"name\":\"re\", \"type\":\"double\"},"
+        "{\"name\":\"im\", \"type\":\"double\"}"
+    "]}";
+
+class DataFileTest {
+    const char* filename;
+    const ValidSchema writerSchema;
+    const ValidSchema readerSchema;
+
+public:
+    DataFileTest(const char* f, const char* wsch, const char* rsch) :
+        filename(f), writerSchema(makeValidSchema(wsch)),
+        readerSchema(makeValidSchema(rsch)) { }
+
+    typedef pair<ValidSchema, GenericDatum> Pair;
+
+    void testCleanup() {
+        BOOST_CHECK(boost::filesystem::remove(filename));
+    }
+    
+    void testWrite() {
+        avro::DataFileWriter<ComplexInteger> df(filename, writerSchema, 100);
+        int64_t re = 3;
+        int64_t im = 5;
+        for (int i = 0; i < 1000; ++i, re *= im, im += 3) {
+            ComplexInteger c(re, im);
+            df.write(c);
+        }
+        df.close();
+    }
+
+    void testWriteGeneric() {
+        avro::DataFileWriter<Pair> df(filename, writerSchema, 100);
+        int64_t re = 3;
+        int64_t im = 5;
+        Pair p(writerSchema, GenericDatum());
+
+        GenericDatum& c = p.second;
+        c = GenericDatum(writerSchema.root());
+        GenericRecord& r = c.value<GenericRecord>();
+
+        for (int i = 0; i < 1000; ++i, re *= im, im += 3) {
+            r.fieldAt(0) = re;
+            r.fieldAt(1) = im;
+            df.write(p);
+        }
+        df.close();
+    }
+
+    void testWriteDouble() {
+        avro::DataFileWriter<ComplexDouble> df(filename, writerSchema, 100);
+        double re = 3.0;
+        double im = 5.0;
+        for (int i = 0; i < 1000; ++i, re += im - 0.7, im += 3.1) {
+            ComplexDouble c(re, im);
+            df.write(c);
+        }
+        df.close();
+    }
+
+    void testReadFull() {
+        avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+        int i = 0;
+        ComplexInteger ci;
+        int64_t re = 3;
+        int64_t im = 5;
+        while (df.read(ci)) {
+            BOOST_CHECK_EQUAL(ci.re, re);
+            BOOST_CHECK_EQUAL(ci.im, im);
+            re *= im;
+            im += 3;
+            ++i;
+        }
+        BOOST_CHECK_EQUAL(i, 1000);
+    }
+
+    void testReadProjection() {
+        avro::DataFileReader<Integer> df(filename, readerSchema);
+        int i = 0;
+        Integer integer;
+        int64_t re = 3;
+        int64_t im = 5;
+        while (df.read(integer)) {
+            BOOST_CHECK_EQUAL(integer.re, re);
+            re *= im;
+            im += 3;
+            ++i;
+        }
+        BOOST_CHECK_EQUAL(i, 1000);
+    }
+
+    void testReaderGeneric() {
+        avro::DataFileReader<Pair> df(filename, writerSchema);
+        int i = 0;
+        Pair p(writerSchema, GenericDatum());
+        int64_t re = 3;
+        int64_t im = 5;
+
+        const GenericDatum& ci = p.second;
+        while (df.read(p)) {
+            BOOST_REQUIRE_EQUAL(ci.type(), avro::AVRO_RECORD);
+            const GenericRecord& r = ci.value<GenericRecord>();
+            const size_t n = 2;
+            BOOST_REQUIRE_EQUAL(r.fieldCount(), n);
+            const GenericDatum& f0 = r.fieldAt(0);
+            BOOST_REQUIRE_EQUAL(f0.type(), avro::AVRO_LONG);
+            BOOST_CHECK_EQUAL(f0.value<int64_t>(), re);
+
+            const GenericDatum& f1 = r.fieldAt(1);
+            BOOST_REQUIRE_EQUAL(f1.type(), avro::AVRO_LONG);
+            BOOST_CHECK_EQUAL(f1.value<int64_t>(), im);
+            re *= im;
+            im += 3;
+            ++i;
+        }
+        BOOST_CHECK_EQUAL(i, 1000);
+    }
+
+    void testReaderGenericProjection() {
+        avro::DataFileReader<Pair> df(filename, readerSchema);
+        int i = 0;
+        Pair p(readerSchema, GenericDatum());
+        int64_t re = 3;
+        int64_t im = 5;
+
+        const GenericDatum& ci = p.second;
+        while (df.read(p)) {
+            BOOST_REQUIRE_EQUAL(ci.type(), avro::AVRO_RECORD);
+            const GenericRecord& r = ci.value<GenericRecord>();
+            const size_t n = 1;
+            BOOST_REQUIRE_EQUAL(r.fieldCount(), n);
+            const GenericDatum& f0 = r.fieldAt(0);
+            BOOST_REQUIRE_EQUAL(f0.type(), avro::AVRO_LONG);
+            BOOST_CHECK_EQUAL(f0.value<int64_t>(), re);
+
+            re *= im;
+            im += 3;
+            ++i;
+        }
+        BOOST_CHECK_EQUAL(i, 1000);
+    }
+
+    void testReadDouble() {
+        avro::DataFileReader<ComplexDouble> df(filename, writerSchema);
+        int i = 0;
+        ComplexDouble ci;
+        double re = 3.0;
+        double im = 5.0;
+        while (df.read(ci)) {
+            BOOST_CHECK_CLOSE(ci.re, re, 0.0001);
+            BOOST_CHECK_CLOSE(ci.im, im, 0.0001);
+            re += (im - 0.7);
+            im += 3.1;
+            ++i;
+        }
+        BOOST_CHECK_EQUAL(i, 1000);
+    }
+
+};
+
+void addReaderTests(test_suite* ts, const shared_ptr<DataFileTest>& t)
+{
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadFull, t));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadProjection, t));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReaderGeneric, t));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReaderGenericProjection,
+        t));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
+
+}
+
+test_suite*
+init_unit_test_suite( int argc, char* argv[] ) 
+{
+    test_suite* ts= BOOST_TEST_SUITE("DataFile tests");
+    shared_ptr<DataFileTest> t1(new DataFileTest("test1.df", sch, isch));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t1));
+    addReaderTests(ts, t1);
+
+    shared_ptr<DataFileTest> t2(new DataFileTest("test2.df", sch, isch));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteGeneric, t2));
+    addReaderTests(ts, t2);
+
+    shared_ptr<DataFileTest> t3(new DataFileTest("test3.df", dsch, dsch));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteDouble, t3));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadDouble, t3));
+    ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t3));
+    return ts;
+}