You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by th...@apache.org on 2011/03/30 09:40:54 UTC
svn commit: r1086866 - in /avro/trunk: ./ lang/c++/ lang/c++/api/
lang/c++/impl/ lang/c++/impl/parsing/ lang/c++/test/
Author: thiru
Date: Wed Mar 30 07:40:53 2011
New Revision: 1086866
URL: http://svn.apache.org/viewvc?rev=1086866&view=rev
Log:
AVRO-789. Datafile support in C++
Added:
avro/trunk/lang/c++/api/DataFile.hh
avro/trunk/lang/c++/impl/DataFile.cc
avro/trunk/lang/c++/test/DataFileTests.cc
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/c++/CMakeLists.txt
avro/trunk/lang/c++/api/Stream.hh
avro/trunk/lang/c++/build.sh
avro/trunk/lang/c++/impl/FileStream.cc
avro/trunk/lang/c++/impl/parsing/ResolvingDecoder.cc
avro/trunk/lang/c++/impl/parsing/Symbol.hh
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Mar 30 07:40:53 2011
@@ -248,6 +248,8 @@ Avro 1.5.0 (10 March 2011)
AVRO-783. Specifc object support in C++. (thiru)
+ AVRO-789. Datafile support in C++. (thiru)
+
BUG FIXES
AVRO-764. Java: Bug in BinaryData.compare() with offset comparison.
Modified: avro/trunk/lang/c++/CMakeLists.txt
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/CMakeLists.txt?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/CMakeLists.txt (original)
+++ avro/trunk/lang/c++/CMakeLists.txt Wed Mar 30 07:40:53 2011
@@ -45,6 +45,7 @@ add_library (avrocpp SHARED
impl/BinaryEncoder.cc impl/BinaryDecoder.cc
impl/Stream.cc impl/FileStream.cc
impl/Generic.cc
+ impl/DataFile.cc
impl/parsing/Symbol.cc
impl/parsing/ValidatingCodec.cc
impl/parsing/JsonCodec.cc
@@ -136,6 +137,9 @@ add_dependencies (AvrogencppTests bigrec
union_array_union_hh union_map_union_hh)
target_link_libraries (AvrogencppTests avrocpp ${BOOST_LIBRARIES})
+add_executable (DataFileTests test/DataFileTests.cc)
+target_link_libraries (DataFileTests avrocpp ${Boost_LIBRARIES})
+
include (InstallRequiredSystemLibraries)
set (CPACK_PACKAGE_FILE_NAME "avrocpp-${AVRO_VERSION_MAJOR}")
Added: avro/trunk/lang/c++/api/DataFile.hh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/api/DataFile.hh?rev=1086866&view=auto
==============================================================================
--- avro/trunk/lang/c++/api/DataFile.hh (added)
+++ avro/trunk/lang/c++/api/DataFile.hh Wed Mar 30 07:40:53 2011
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef avro_DataFile_hh__
+#define avro_DataFile_hh__
+
+#include "Encoder.hh"
+#include "buffer/Buffer.hh"
+#include "ValidSchema.hh"
+#include "Specific.hh"
+#include "Stream.hh"
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "boost/array.hpp"
+#include "boost/utility.hpp"
+
+namespace avro {
+
+typedef boost::array<uint8_t, 16> DataFileSync;
+
+/**
+ * Type-independent portion of DataFileWriter.
+ * At any given point in time, at most one file can be written using
+ * this object.
+ */
+class DataFileWriterBase : boost::noncopyable {
+ const std::string filename_;
+ const ValidSchema schema_;
+ const EncoderPtr encoderPtr_;
+ const size_t syncInterval_;
+
+ std::auto_ptr<OutputStream> stream_;
+ std::auto_ptr<OutputStream> buffer_;
+ const DataFileSync sync_;
+ int64_t objectCount_;
+
+ typedef std::map<std::string, std::vector<uint8_t> > Metadata;
+
+ Metadata metadata_;
+
+ static std::auto_ptr<OutputStream> makeStream(const char* filename);
+ static DataFileSync makeSync();
+
+ void writeHeader();
+ void setMetadata(const std::string& key, const std::string& value);
+
+ /**
+ * Generates a sync marker in the file.
+ */
+ void sync();
+
+protected:
+ Encoder& encoder() const { return *encoderPtr_; }
+
+ void syncIfNeeded();
+
+ void incr() {
+ ++objectCount_;
+ }
+public:
+ /**
+ * Constructs a data file writer with the given sync interval and name.
+ */
+ DataFileWriterBase(const char* filename, const ValidSchema& schema,
+ size_t syncInterval);
+
+ ~DataFileWriterBase();
+ /**
+ * Closes the current file. Once closed this datafile object cannot be
+ * used for writing any more.
+ */
+ void close();
+
+ /**
+ * Returns the schema for this data file.
+ */
+ const ValidSchema& schema() const { return schema_; }
+
+ /**
+ * Flushes any unwritten data into the file.
+ */
+ void flush();
+};
+
+/**
+ * An Avro datafile that can store objects of type T.
+ */
+template <typename T>
+class DataFileWriter : public DataFileWriterBase {
+public:
+ /**
+ * Constructs a new data file.
+ */
+ DataFileWriter(const char* filename, const ValidSchema& schema,
+ size_t syncInterval = 16 * 1024) :
+ DataFileWriterBase(filename, schema, syncInterval) { }
+
+ /**
+ * Writes the given piece of data into the file.
+ */
+ void write(const T& datum) {
+ syncIfNeeded();
+ avro::encode(encoder(), datum);
+ incr();
+ }
+};
+
+class DataFileReaderBase : boost::noncopyable {
+ const std::string filename_;
+ const std::auto_ptr<InputStream> stream_;
+ const DecoderPtr decoder_;
+ int64_t objectCount_;
+
+ ValidSchema readerSchema_;
+ ValidSchema dataSchema_;
+ DecoderPtr dataDecoder_;
+ std::auto_ptr<InputStream> dataStream_;
+ typedef std::map<std::string, std::vector<uint8_t> > Metadata;
+
+ Metadata metadata_;
+ DataFileSync sync_;
+
+ void readHeader();
+
+protected:
+ Decoder& decoder() { return *dataDecoder_; }
+
+ /**
+ * Returns true if and only if there is more to read.
+ */
+ bool hasMore();
+
+ void decr() { --objectCount_; }
+ bool readDataBlock();
+
+public:
+ /**
+ * Constructs the reader for the given file and the reader is
+ * expected to use the given schema.
+ */
+ DataFileReaderBase(const char* filename, const ValidSchema& readerSchema);
+
+ /**
+ * Constructs the reader for the given file and the reader is
+ * expected to use the schema that is used with data.
+ */
+ DataFileReaderBase(const char* filename);
+
+ /**
+ * Returns the schema for this object.
+ */
+ const ValidSchema& readerSchema() { return readerSchema_; }
+
+ /**
+ * Returns the schema stored with the data file.
+ */
+ const ValidSchema& dataSchema() { return dataSchema_; }
+
+ /**
+ * Closes the reader. No further operation is possible on this reader.
+ */
+ void close();
+};
+
+template <typename T>
+class DataFileReader : public DataFileReaderBase {
+public:
+ /**
+ * Constructs the reader for the given file and the reader is
+ * expected to use the given schema.
+ */
+ DataFileReader(const char* filename, const ValidSchema& readerSchema) :
+ DataFileReaderBase(filename, readerSchema) { }
+
+ /**
+ * Constructs the reader for the given file and the reader is
+ * expected to use the schema that is used with data.
+ */
+ DataFileReader(const char* filename) : DataFileReaderBase(filename) { }
+
+ bool read(T& datum) {
+ if (hasMore()) {
+ decr();
+ avro::decode(decoder(), datum);
+ return true;
+ }
+ return false;
+ }
+};
+
+} // namespace avro
+#endif
+
+
Modified: avro/trunk/lang/c++/api/Stream.hh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/api/Stream.hh?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/api/Stream.hh (original)
+++ avro/trunk/lang/c++/api/Stream.hh Wed Mar 30 07:40:53 2011
@@ -136,6 +136,7 @@ struct StreamReader {
const uint8_t* end_;
StreamReader() : in_(0), next_(0), end_(0) { }
+ StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
void reset(InputStream& is) {
if (in_ != 0) {
@@ -209,6 +210,7 @@ struct StreamWriter {
uint8_t* end_;
StreamWriter() : out_(0), next_(0), end_(0) { }
+ StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
void reset(OutputStream& os) {
if (out_ != 0) {
@@ -260,6 +262,22 @@ struct StreamWriter {
out_->flush();
}
};
+
+/**
+ * A convenience function to copy all the contents of an input stream into
+ * an output stream.
+ */
+inline void copy(InputStream& in, OutputStream& out)
+{
+ const uint8_t *p = 0;
+ size_t n = 0;
+ StreamWriter w(out);
+ while (in.next(&p, &n)) {
+ w.writeBytes(p, n);
+ }
+ w.flush();
+}
+
} // namespace avro
#endif
Modified: avro/trunk/lang/c++/build.sh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/build.sh?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/build.sh (original)
+++ avro/trunk/lang/c++/build.sh Wed Mar 30 07:40:53 2011
@@ -84,6 +84,7 @@ case "$target" in
./build/StreamTests
./build/SpecificTests
./build/AvrogencppTests
+ ./build/DataFileTests
;;
dist)
Added: avro/trunk/lang/c++/impl/DataFile.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/DataFile.cc?rev=1086866&view=auto
==============================================================================
--- avro/trunk/lang/c++/impl/DataFile.cc (added)
+++ avro/trunk/lang/c++/impl/DataFile.cc Wed Mar 30 07:40:53 2011
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DataFile.hh"
+#include "Compiler.hh"
+#include "Exception.hh"
+
+#include <sstream>
+
+#include <boost/random/mersenne_twister.hpp>
+
+namespace avro {
+using std::auto_ptr;
+using std::ostringstream;
+using std::istringstream;
+using std::vector;
+using std::copy;
+using std::string;
+
+using boost::array;
+
+const string AVRO_SCHEMA_KEY("avro.schema");
+const string AVRO_CODEC_KEY("avro.codec");
+const string AVRO_NULL_CODEC("null");
+
+const size_t minSyncInterval = 32;
+const size_t maxSyncInterval = 1u << 30;
+const size_t defaultSyncInterval = 16 * 1024;
+
+static string toString(const ValidSchema& schema)
+{
+ ostringstream oss;
+ schema.toJson(oss);
+ return oss.str();
+}
+
+DataFileWriterBase::DataFileWriterBase(const char* filename,
+ const ValidSchema& schema, size_t syncInterval) :
+ filename_(filename), schema_(schema), encoderPtr_(binaryEncoder()),
+ syncInterval_(syncInterval),
+ stream_(fileOutputStream(filename)),
+ buffer_(memoryOutputStream()),
+ sync_(makeSync()), objectCount_(0)
+{
+ if (syncInterval < minSyncInterval || syncInterval > maxSyncInterval) {
+ throw Exception(boost::format("Invalid sync interval: %1%. "
+ "Should be between %2% and %3%") % syncInterval %
+ minSyncInterval % maxSyncInterval);
+ }
+ setMetadata(AVRO_CODEC_KEY, AVRO_NULL_CODEC);
+
+ setMetadata(AVRO_SCHEMA_KEY, toString(schema));
+
+ writeHeader();
+ encoderPtr_->init(*buffer_);
+}
+
+DataFileWriterBase::~DataFileWriterBase()
+{
+ if (stream_.get()) {
+ close();
+ }
+}
+
+void DataFileWriterBase::close()
+{
+ flush();
+ stream_.reset();
+}
+
+void DataFileWriterBase::sync()
+{
+ encoderPtr_->flush();
+
+ encoderPtr_->init(*stream_);
+ avro::encode(*encoderPtr_, objectCount_);
+ int64_t byteCount = buffer_->byteCount();
+ avro::encode(*encoderPtr_, byteCount);
+ encoderPtr_->flush();
+
+ auto_ptr<InputStream> in = memoryInputStream(*buffer_);
+ copy(*in, *stream_);
+
+ encoderPtr_->init(*stream_);
+ avro::encode(*encoderPtr_, sync_);
+ encoderPtr_->flush();
+
+
+ buffer_ = memoryOutputStream();
+ encoderPtr_->init(*buffer_);
+ objectCount_ = 0;
+}
+
+void DataFileWriterBase::syncIfNeeded()
+{
+ encoderPtr_->flush();
+ if (buffer_->byteCount() >= syncInterval_) {
+ sync();
+ }
+}
+
+void DataFileWriterBase::flush()
+{
+ sync();
+}
+
+boost::mt19937 random(time(0));
+
+DataFileSync DataFileWriterBase::makeSync()
+{
+ DataFileSync sync;
+ for (size_t i = 0; i < sync.size(); ++i) {
+ sync[i] = random();
+ }
+ return sync;
+}
+
+typedef array<uint8_t, 4> Magic;
+static Magic magic = { 'O', 'b', 'j', '\x01' };
+
+void DataFileWriterBase::writeHeader()
+{
+ encoderPtr_->init(*stream_);
+ avro::encode(*encoderPtr_, magic);
+ avro::encode(*encoderPtr_, metadata_);
+ avro::encode(*encoderPtr_, sync_);
+ encoderPtr_->flush();
+}
+
+void DataFileWriterBase::setMetadata(const string& key, const string& value)
+{
+ vector<uint8_t> v(value.size());
+ copy(value.begin(), value.end(), v.begin());
+ metadata_[key] = v;
+}
+
+DataFileReaderBase::DataFileReaderBase(const char* filename,
+ const ValidSchema& schema) :
+ filename_(filename), stream_(fileInputStream(filename)),
+ decoder_(binaryDecoder()), objectCount_(0), readerSchema_(schema)
+{
+ readHeader();
+}
+
+DataFileReaderBase::DataFileReaderBase(const char* filename) :
+ filename_(filename), stream_(fileInputStream(filename)),
+ decoder_(binaryDecoder()), objectCount_(0)
+{
+ readHeader();
+}
+
+static void drain(InputStream& in)
+{
+ const uint8_t *p = 0;
+ size_t n = 0;
+ while (in.next(&p, &n));
+}
+
+char hex(unsigned int x)
+{
+ return x + (x < 10 ? '0' : ('a' - 10));
+}
+
+std::ostream& operator << (std::ostream& os, const DataFileSync& s)
+{
+ for (size_t i = 0; i < s.size(); ++i) {
+ os << hex(s[i] / 16) << hex(s[i] % 16) << ' ';
+ }
+ os << std::endl;
+ return os;
+}
+
+
+bool DataFileReaderBase::hasMore()
+{
+ if (objectCount_ != 0) {
+ return true;
+ }
+ dataDecoder_->init(*dataStream_);
+ drain(*dataStream_);
+ DataFileSync s;
+ decoder_->init(*stream_);
+ avro::decode(*decoder_, s);
+ if (s != sync_) {
+ throw Exception("Sync mismatch");
+ }
+ return readDataBlock();
+}
+
+class BoundedInputStream : public InputStream {
+ InputStream& in_;
+ size_t limit_;
+
+ bool next(const uint8_t** data, size_t* len) {
+ if (limit_ != 0 && in_.next(data, len)) {
+ if (*len > limit_) {
+ in_.backup(*len - limit_);
+ *len = limit_;
+ }
+ limit_ -= *len;
+ return true;
+ }
+ return false;
+ }
+
+ void backup(size_t len) {
+ in_.backup(len);
+ limit_ += len;
+ }
+
+ void skip(size_t len) {
+ if (len > limit_) {
+ len = limit_;
+ }
+ in_.skip(len);
+ limit_ -= len;
+ }
+
+ size_t byteCount() const {
+ return in_.byteCount();
+ }
+
+public:
+ BoundedInputStream(InputStream& in, size_t limit) :
+ in_(in), limit_(limit) { }
+};
+
+auto_ptr<InputStream> boundedInputStream(InputStream& in, size_t limit)
+{
+ return auto_ptr<InputStream>(new BoundedInputStream(in, limit));
+}
+
+bool DataFileReaderBase::readDataBlock()
+{
+ decoder_->init(*stream_);
+ const uint8_t* p = 0;
+ size_t n = 0;
+ if (! stream_->next(&p, &n)) {
+ return false;
+ }
+ stream_->backup(n);
+ avro::decode(*decoder_, objectCount_);
+ int64_t byteCount;
+ avro::decode(*decoder_, byteCount);
+ decoder_->init(*stream_);
+
+ auto_ptr<InputStream> st = boundedInputStream(*stream_, byteCount);
+ dataDecoder_->init(*st);
+ dataStream_ = st;
+ return true;
+}
+
+void DataFileReaderBase::close()
+{
+}
+
+static string toString(const vector<uint8_t>& v)
+{
+ string result;
+ result.resize(v.size());
+ copy(v.begin(), v.end(), result.begin());
+ return result;
+}
+
+static ValidSchema makeSchema(const vector<uint8_t>& v)
+{
+ istringstream iss(toString(v));
+ ValidSchema vs;
+ compileJsonSchema(iss, vs);
+ return ValidSchema(vs);
+}
+
+void DataFileReaderBase::readHeader()
+{
+ decoder_->init(*stream_);
+ Magic m;
+ avro::decode(*decoder_, m);
+ if (magic != m) {
+ throw Exception("Invalid data file. Magic does not match: "
+ + filename_);
+ }
+ avro::decode(*decoder_, metadata_);
+ Metadata::const_iterator it = metadata_.find(AVRO_SCHEMA_KEY);
+ if (it == metadata_.end()) {
+ throw Exception("No schema in metadata");
+ }
+
+ dataSchema_ = makeSchema(it->second);
+ if (! readerSchema_.root()) {
+ readerSchema_ = dataSchema();
+ }
+
+ it = metadata_.find(AVRO_CODEC_KEY);
+ if (it != metadata_.end() && toString(it->second) != AVRO_NULL_CODEC) {
+ throw Exception("Unknown codec in data file: " + toString(it->second));
+ }
+
+ dataDecoder_ = (toString(readerSchema_) != toString(dataSchema_)) ?
+ resolvingDecoder(dataSchema_, readerSchema_, binaryDecoder()) :
+ binaryDecoder();
+
+ avro::decode(*decoder_, sync_);
+ readDataBlock();
+}
+
+} // namespace avro
Modified: avro/trunk/lang/c++/impl/FileStream.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/FileStream.cc?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/impl/FileStream.cc (original)
+++ avro/trunk/lang/c++/impl/FileStream.cc Wed Mar 30 07:40:53 2011
@@ -113,16 +113,17 @@ class FileOutputStream : public OutputSt
size_t available_;
size_t byteCount_;
+ // Invaiant: byteCount_ == byteswritten + bufferSize_ - available_;
bool next(uint8_t** data, size_t* len) {
if (available_ == 0) {
flush();
}
*data = next_;
*len = available_;
- byteCount_ += available_;
next_ += available_;
byteCount_ += available_;
available_ = 0;
+ return true;
}
void backup(size_t len) {
@@ -150,7 +151,7 @@ public:
buffer_(new uint8_t[bufferSize]),
out_(::open(filename, O_WRONLY | O_CREAT | O_BINARY, 0644)),
next_(buffer_),
- available_(bufferSize_) { }
+ available_(bufferSize_), byteCount_(0) { }
~FileOutputStream() {
if (out_ >= 0) {
Modified: avro/trunk/lang/c++/impl/parsing/ResolvingDecoder.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/parsing/ResolvingDecoder.cc?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/impl/parsing/ResolvingDecoder.cc (original)
+++ avro/trunk/lang/c++/impl/parsing/ResolvingDecoder.cc Wed Mar 30 07:40:53 2011
@@ -450,6 +450,7 @@ template <typename P>
void ResolvingDecoderImpl<P>::init(InputStream& is)
{
base_->init(is);
+ parser_.reset();
}
template <typename P>
Modified: avro/trunk/lang/c++/impl/parsing/Symbol.hh
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/impl/parsing/Symbol.hh?rev=1086866&r1=1086865&r2=1086866&view=diff
==============================================================================
--- avro/trunk/lang/c++/impl/parsing/Symbol.hh (original)
+++ avro/trunk/lang/c++/impl/parsing/Symbol.hh Wed Mar 30 07:40:53 2011
@@ -693,6 +693,12 @@ public:
parsingStack.push(s);
}
+ void reset() {
+ while (parsingStack.size() > 1) {
+ parsingStack.pop();
+ }
+ }
+
};
} // namespace parsing
Added: avro/trunk/lang/c++/test/DataFileTests.cc
URL: http://svn.apache.org/viewvc/avro/trunk/lang/c%2B%2B/test/DataFileTests.cc?rev=1086866&view=auto
==============================================================================
--- avro/trunk/lang/c++/test/DataFileTests.cc (added)
+++ avro/trunk/lang/c++/test/DataFileTests.cc Wed Mar 30 07:40:53 2011
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/test/included/unit_test_framework.hpp>
+#include <boost/test/unit_test.hpp>
+#include <boost/filesystem.hpp>
+
+#include <sstream>
+
+#include "DataFile.hh"
+#include "Generic.hh"
+#include "Stream.hh"
+#include "Compiler.hh"
+
+using std::auto_ptr;
+using std::string;
+using std::pair;
+using std::vector;
+using std::map;
+using std::istringstream;
+
+using boost::array;
+using boost::shared_ptr;
+using boost::unit_test::test_suite;
+
+
+using avro::ValidSchema;
+using avro::GenericDatum;
+using avro::GenericRecord;
+
+template <typename T>
+struct Complex {
+ T re;
+ T im;
+ Complex() : re(0), im(0) { }
+ Complex(T r, T i) : re(r), im(i) { }
+};
+
+struct Integer {
+ int64_t re;
+ Integer() : re(0) { }
+ Integer(int64_t r) : re(r) { }
+
+ bool operator==(const Integer& oth) const {
+ return re == oth.re;
+ }
+};
+
+typedef Complex<int64_t> ComplexInteger;
+typedef Complex<double> ComplexDouble;
+
+namespace avro {
+
+template <typename T> struct codec_traits<Complex<T> > {
+ static void encode(Encoder& e, const Complex<T>& c) {
+ avro::encode(e, c.re);
+ avro::encode(e, c.im);
+ }
+
+ static void decode(Decoder& d, Complex<T>& c) {
+ avro::decode(d, c.re);
+ avro::decode(d, c.im);
+ }
+};
+
+template <> struct codec_traits<Integer> {
+ static void decode(Decoder& d, Integer& c) {
+ avro::decode(d, c.re);
+ }
+};
+
+}
+
+static ValidSchema makeValidSchema(const char* schema)
+{
+ istringstream iss(schema);
+ ValidSchema vs;
+ compileJsonSchema(iss, vs);
+ return ValidSchema(vs);
+}
+
+static const char sch[] = "{\"type\": \"record\","
+ "\"name\":\"ComplexInteger\", \"fields\": ["
+ "{\"name\":\"re\", \"type\":\"long\"},"
+ "{\"name\":\"im\", \"type\":\"long\"}"
+ "]}";
+static const char isch[] = "{\"type\": \"record\","
+ "\"name\":\"ComplexInteger\", \"fields\": ["
+ "{\"name\":\"re\", \"type\":\"long\"}"
+ "]}";
+static const char dsch[] = "{\"type\": \"record\","
+ "\"name\":\"ComplexDouble\", \"fields\": ["
+ "{\"name\":\"re\", \"type\":\"double\"},"
+ "{\"name\":\"im\", \"type\":\"double\"}"
+ "]}";
+
+class DataFileTest {
+ const char* filename;
+ const ValidSchema writerSchema;
+ const ValidSchema readerSchema;
+
+public:
+ DataFileTest(const char* f, const char* wsch, const char* rsch) :
+ filename(f), writerSchema(makeValidSchema(wsch)),
+ readerSchema(makeValidSchema(rsch)) { }
+
+ typedef pair<ValidSchema, GenericDatum> Pair;
+
+ void testCleanup() {
+ BOOST_CHECK(boost::filesystem::remove(filename));
+ }
+
+ void testWrite() {
+ avro::DataFileWriter<ComplexInteger> df(filename, writerSchema, 100);
+ int64_t re = 3;
+ int64_t im = 5;
+ for (int i = 0; i < 1000; ++i, re *= im, im += 3) {
+ ComplexInteger c(re, im);
+ df.write(c);
+ }
+ df.close();
+ }
+
+ void testWriteGeneric() {
+ avro::DataFileWriter<Pair> df(filename, writerSchema, 100);
+ int64_t re = 3;
+ int64_t im = 5;
+ Pair p(writerSchema, GenericDatum());
+
+ GenericDatum& c = p.second;
+ c = GenericDatum(writerSchema.root());
+ GenericRecord& r = c.value<GenericRecord>();
+
+ for (int i = 0; i < 1000; ++i, re *= im, im += 3) {
+ r.fieldAt(0) = re;
+ r.fieldAt(1) = im;
+ df.write(p);
+ }
+ df.close();
+ }
+
+ void testWriteDouble() {
+ avro::DataFileWriter<ComplexDouble> df(filename, writerSchema, 100);
+ double re = 3.0;
+ double im = 5.0;
+ for (int i = 0; i < 1000; ++i, re += im - 0.7, im += 3.1) {
+ ComplexDouble c(re, im);
+ df.write(c);
+ }
+ df.close();
+ }
+
+ void testReadFull() {
+ avro::DataFileReader<ComplexInteger> df(filename, writerSchema);
+ int i = 0;
+ ComplexInteger ci;
+ int64_t re = 3;
+ int64_t im = 5;
+ while (df.read(ci)) {
+ BOOST_CHECK_EQUAL(ci.re, re);
+ BOOST_CHECK_EQUAL(ci.im, im);
+ re *= im;
+ im += 3;
+ ++i;
+ }
+ BOOST_CHECK_EQUAL(i, 1000);
+ }
+
+ void testReadProjection() {
+ avro::DataFileReader<Integer> df(filename, readerSchema);
+ int i = 0;
+ Integer integer;
+ int64_t re = 3;
+ int64_t im = 5;
+ while (df.read(integer)) {
+ BOOST_CHECK_EQUAL(integer.re, re);
+ re *= im;
+ im += 3;
+ ++i;
+ }
+ BOOST_CHECK_EQUAL(i, 1000);
+ }
+
+ void testReaderGeneric() {
+ avro::DataFileReader<Pair> df(filename, writerSchema);
+ int i = 0;
+ Pair p(writerSchema, GenericDatum());
+ int64_t re = 3;
+ int64_t im = 5;
+
+ const GenericDatum& ci = p.second;
+ while (df.read(p)) {
+ BOOST_REQUIRE_EQUAL(ci.type(), avro::AVRO_RECORD);
+ const GenericRecord& r = ci.value<GenericRecord>();
+ const size_t n = 2;
+ BOOST_REQUIRE_EQUAL(r.fieldCount(), n);
+ const GenericDatum& f0 = r.fieldAt(0);
+ BOOST_REQUIRE_EQUAL(f0.type(), avro::AVRO_LONG);
+ BOOST_CHECK_EQUAL(f0.value<int64_t>(), re);
+
+ const GenericDatum& f1 = r.fieldAt(1);
+ BOOST_REQUIRE_EQUAL(f1.type(), avro::AVRO_LONG);
+ BOOST_CHECK_EQUAL(f1.value<int64_t>(), im);
+ re *= im;
+ im += 3;
+ ++i;
+ }
+ BOOST_CHECK_EQUAL(i, 1000);
+ }
+
+ void testReaderGenericProjection() {
+ avro::DataFileReader<Pair> df(filename, readerSchema);
+ int i = 0;
+ Pair p(readerSchema, GenericDatum());
+ int64_t re = 3;
+ int64_t im = 5;
+
+ const GenericDatum& ci = p.second;
+ while (df.read(p)) {
+ BOOST_REQUIRE_EQUAL(ci.type(), avro::AVRO_RECORD);
+ const GenericRecord& r = ci.value<GenericRecord>();
+ const size_t n = 1;
+ BOOST_REQUIRE_EQUAL(r.fieldCount(), n);
+ const GenericDatum& f0 = r.fieldAt(0);
+ BOOST_REQUIRE_EQUAL(f0.type(), avro::AVRO_LONG);
+ BOOST_CHECK_EQUAL(f0.value<int64_t>(), re);
+
+ re *= im;
+ im += 3;
+ ++i;
+ }
+ BOOST_CHECK_EQUAL(i, 1000);
+ }
+
+ void testReadDouble() {
+ avro::DataFileReader<ComplexDouble> df(filename, writerSchema);
+ int i = 0;
+ ComplexDouble ci;
+ double re = 3.0;
+ double im = 5.0;
+ while (df.read(ci)) {
+ BOOST_CHECK_CLOSE(ci.re, re, 0.0001);
+ BOOST_CHECK_CLOSE(ci.im, im, 0.0001);
+ re += (im - 0.7);
+ im += 3.1;
+ ++i;
+ }
+ BOOST_CHECK_EQUAL(i, 1000);
+ }
+
+};
+
+void addReaderTests(test_suite* ts, const shared_ptr<DataFileTest>& t)
+{
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadFull, t));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadProjection, t));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReaderGeneric, t));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReaderGenericProjection,
+ t));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t));
+
+}
+
+test_suite*
+init_unit_test_suite( int argc, char* argv[] )
+{
+ test_suite* ts= BOOST_TEST_SUITE("DataFile tests");
+ shared_ptr<DataFileTest> t1(new DataFileTest("test1.df", sch, isch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWrite, t1));
+ addReaderTests(ts, t1);
+
+ shared_ptr<DataFileTest> t2(new DataFileTest("test2.df", sch, isch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteGeneric, t2));
+ addReaderTests(ts, t2);
+
+ shared_ptr<DataFileTest> t3(new DataFileTest("test3.df", dsch, dsch));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testWriteDouble, t3));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testReadDouble, t3));
+ ts->add(BOOST_CLASS_TEST_CASE(&DataFileTest::testCleanup, t3));
+ return ts;
+}