You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by sb...@apache.org on 2010/04/08 18:48:20 UTC
svn commit: r931999 [1/2] - in /hadoop/avro/trunk: ./ lang/c++/
lang/c++/api/ lang/c++/api/buffer/ lang/c++/api/buffer/detail/
lang/c++/impl/ lang/c++/m4/ lang/c++/test/
Author: sbanacho
Date: Thu Apr 8 16:48:19 2010
New Revision: 931999
URL: http://svn.apache.org/viewvc?rev=931999&view=rev
Log:
AVRO-508. Use page-backed buffers for C++ serialization input or output.
Added:
hadoop/avro/trunk/lang/c++/api/buffer/
hadoop/avro/trunk/lang/c++/api/buffer/Buffer.hh
hadoop/avro/trunk/lang/c++/api/buffer/BufferPrint.hh
hadoop/avro/trunk/lang/c++/api/buffer/BufferReader.hh
hadoop/avro/trunk/lang/c++/api/buffer/BufferStream.hh
hadoop/avro/trunk/lang/c++/api/buffer/BufferStreambuf.hh
hadoop/avro/trunk/lang/c++/api/buffer/detail/
hadoop/avro/trunk/lang/c++/api/buffer/detail/BufferDetail.hh
hadoop/avro/trunk/lang/c++/api/buffer/detail/BufferDetailIterator.hh
hadoop/avro/trunk/lang/c++/m4/m4_ax_boost_system.m4 (with props)
hadoop/avro/trunk/lang/c++/m4/m4_ax_boost_thread.m4 (with props)
hadoop/avro/trunk/lang/c++/test/buffertest.cc
Removed:
hadoop/avro/trunk/lang/c++/api/InputStreamer.hh
hadoop/avro/trunk/lang/c++/api/OutputStreamer.hh
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/lang/c++/MainPage.dox
hadoop/avro/trunk/lang/c++/Makefile.am
hadoop/avro/trunk/lang/c++/api/Parser.hh
hadoop/avro/trunk/lang/c++/api/Reader.hh
hadoop/avro/trunk/lang/c++/api/ResolvingReader.hh
hadoop/avro/trunk/lang/c++/api/Serializer.hh
hadoop/avro/trunk/lang/c++/api/ValidatingReader.hh
hadoop/avro/trunk/lang/c++/api/ValidatingWriter.hh
hadoop/avro/trunk/lang/c++/api/Validator.hh
hadoop/avro/trunk/lang/c++/api/Writer.hh
hadoop/avro/trunk/lang/c++/configure.in
hadoop/avro/trunk/lang/c++/impl/Compiler.cc
hadoop/avro/trunk/lang/c++/impl/ValidatingReader.cc
hadoop/avro/trunk/lang/c++/impl/ValidatingWriter.cc
hadoop/avro/trunk/lang/c++/test/testgen.cc
hadoop/avro/trunk/lang/c++/test/unittest.cc
Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Apr 8 16:48:19 2010
@@ -20,6 +20,8 @@ Avro 1.4.0 (unreleased)
AVRO-450. HTTP IPC for ruby. (jmhodges)
+ AVRO-508. Use page-backed buffers for C++ serialization input or output. (sbanacho)
+
BUG FIXES
AVRO-461. Skipping primitives in the ruby side (jmhodges)
Modified: hadoop/avro/trunk/lang/c++/MainPage.dox
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/MainPage.dox?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/MainPage.dox (original)
+++ hadoop/avro/trunk/lang/c++/MainPage.dox Thu Apr 8 16:48:19 2010
@@ -103,44 +103,28 @@ at an example of serializing this data:<
c.real = 10.0;
c.imaginary = 20.0;
- // Declare the stream to which to serialize the data to
- std::ostringstream os;
-
- // Ostreamer wraps a stream so that Avro serializer can use it
- avro::Ostreamer ostreamer(os);
-
- // Writer is the object that will do the actual I/O
- avro::Writer writer(ostreamer);
+ // Writer is the object that will do the actual I/O and buffer the results
+ avro::Writer writer;
// This will invoke the writer on my object
avro::serialize(writer, c);
- // At this point, the ostringstream âosâ stores the serialized data!
+ // At this point, the writer stores the serialized data in a buffer,
+ // which can be extracted to an immutable buffer
+ InputBuffer buffer = writer.buffer();
}</PRE><P>
Using the generated code, all that is required to serialize the data
-is to call avro::serialize() on the object. There is some setup
-required to tell where to write the data. The Ostreamer object is a
-simple object that understands how to wite to STL ostreams. It is
-derived from a virtual base class called OutputStreamer. You can
-derive from OutputStream to create an object that can write to any
-kind of buffer you wish.</P>
+is to call avro::serialize() on the object.</P>
+The data may be be accessed by requesting an avro::InputBuffer
+object. From there, it can be sent to a file, over the network, etc.</P>
<P>Now let's do the inverse, and read the serialized data into our
object:</P>
-<PRE>void parseMyData(const std::string &myData)
+<PRE>void parseMyData(const avro::InputBuffer &myData)
{
Math::complex c;
- // Assume the serialized data is being passed as the contents of a string
- // (Note: this may not be the best way since the data is binary)
-
- // Declare a stream from which to read the serialized data
- std::istringstream is(myData);
-
- // Istreamer wraps a stream so that Avro parser can use it
- avro::Istreamer istreamer(is);
-
// Reader is the object that will do the actual I/O
- avro::Reader reader(istreamer);
+ avro::Reader reader(myData);
// This will invoke the reader on my object
avro::parse(reader, c);
@@ -205,11 +189,8 @@ function from above, but this time check
c.real = 10.0;
c.imaginary = 20.0;
- std::ostringstream os;
- avro::Ostreamer ostreamer(os);
-
// ValidatingWriter will make sure our serializer is writing the correct types
- avro::ValidatingWriter writer(mySchema, ostreamer);
+ avro::ValidatingWriter writer(mySchema);
try {
avro::serialize(writer, c);
@@ -236,13 +217,10 @@ provides an interface to query the next
field's name if it is a member of a record.</P>
<P>The following code is not very flexible, but it does demonstrate
the API:</P>
-<PRE>void parseMyData(const std::string &myData, const avro::ValidSchema &mySchema)
+<PRE>void parseMyData(const avro::InputBuffer &myData, const avro::ValidSchema &mySchema)
{
- std::istringstream is(myData);
- avro::Istreamer istreamer(is);
-
// Manually parse data, the Parser object binds the data to the schema
- avro::Parser<ValidatingReader> parser(mySchema, istreamer);
+ avro::Parser<ValidatingReader> parser(mySchema, myData);
assert( parser.nextType() == AVRO_READER);
@@ -357,9 +335,7 @@ be created that reads the data to the co
avro::ResolverSchema resolverSchema(writerSchema, readerSchema, layout);
// Setup the reader
- std::istringstream is(data);
- avro::IStreamer istreamer(is);
- avro::ResolvingReader reader(resolverSchema, is);
+ avro::ResolvingReader reader(resolverSchema, data);
Math::complex c;
Modified: hadoop/avro/trunk/lang/c++/Makefile.am
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/Makefile.am?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/Makefile.am (original)
+++ hadoop/avro/trunk/lang/c++/Makefile.am Thu Apr 8 16:48:19 2010
@@ -16,12 +16,10 @@ api/Boost.hh \
api/Compiler.hh \
api/CompilerNode.hh \
api/Exception.hh \
-api/InputStreamer.hh \
api/Layout.hh \
api/Node.hh \
api/NodeConcepts.hh \
api/NodeImpl.hh \
-api/OutputStreamer.hh \
api/Parser.hh \
api/Reader.hh \
api/Resolver.hh \
@@ -37,7 +35,14 @@ api/ValidatingReader.hh \
api/ValidatingWriter.hh \
api/Validator.hh \
api/Writer.hh \
-api/Zigzag.hh
+api/Zigzag.hh \
+api/buffer/BufferStreambuf.hh \
+api/buffer/detail/BufferDetailIterator.hh \
+api/buffer/detail/BufferDetail.hh \
+api/buffer/BufferStream.hh \
+api/buffer/BufferPrint.hh \
+api/buffer/Buffer.hh \
+api/buffer/BufferReader.hh
BUILT_SOURCES = AvroYacc.h testgen.hh testgen2.hh
@@ -56,35 +61,7 @@ testparser_LDADD = $(top_builddir)/libav
lib_LTLIBRARIES = libavrocpp.la
-libavrocpp_la_SOURCES = \
-api/AvroParse.hh \
-api/AvroSerialize.hh \
-api/AvroTraits.hh \
-api/Boost.hh \
-api/Compiler.hh \
-api/CompilerNode.hh \
-api/Exception.hh \
-api/InputStreamer.hh \
-api/Layout.hh \
-api/Node.hh \
-api/NodeConcepts.hh \
-api/NodeImpl.hh \
-api/OutputStreamer.hh \
-api/Parser.hh \
-api/Reader.hh \
-api/Resolver.hh \
-api/ResolverSchema.hh \
-api/ResolvingReader.hh \
-api/Schema.hh \
-api/SchemaResolution.hh \
-api/Serializer.hh \
-api/SymbolMap.hh \
-api/Types.hh \
-api/ValidSchema.hh \
-api/ValidatingReader.hh \
-api/ValidatingWriter.hh \
-api/Validator.hh \
-api/Writer.hh \
+libavrocpp_la_SOURCES = $(library_include_HEADERS) \
api/Zigzag.hh \
impl/Compiler.cc \
impl/CompilerNode.cc \
@@ -107,9 +84,9 @@ parser/AvroLex.ll
AM_LFLAGS= -o$(LEX_OUTPUT_ROOT).c
AM_YFLAGS = -d
-check_PROGRAMS = unittest testgen
+check_PROGRAMS = unittest testgen buffertest
-TESTS=unittest testgen
+TESTS=unittest testgen buffertest
TESTS_ENVIRONMENT = top_srcdir=$(top_srcdir)
unittest_SOURCES = test/unittest.cc
@@ -120,6 +97,10 @@ testgen_SOURCES = test/testgen.cc testge
testgen_LDFLAGS = -static -no-install $(BOOST_LDFLAGS)
testgen_LDADD = $(top_builddir)/libavrocpp.la $(BOOST_REGEX_LIB)
+buffertest_SOURCES = test/buffertest.cc
+buffertest_LDFLAGS = -static -no-install $(BOOST_LDFLAGS)
+buffertest_LDADD = $(top_builddir)/libavrocpp.la $(BOOST_REGEX_LIB) $(BOOST_THREAD_LIB) $(BOOST_SYSTEM_LIB)
+
# Make sure we never package up '.svn' directories
dist-hook:
find $(distdir) -name '.svn' | xargs rm -rf
Modified: hadoop/avro/trunk/lang/c++/api/Parser.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/Parser.hh?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/Parser.hh (original)
+++ hadoop/avro/trunk/lang/c++/api/Parser.hh Thu Apr 8 16:48:19 2010
@@ -36,12 +36,12 @@ class Parser : private boost::noncopyabl
public:
// Constructor only works with Writer
- explicit Parser(InputStreamer &in) :
+ explicit Parser(const InputBuffer &in) :
reader_(in)
{}
/// Constructor only works with ValidatingWriter
- Parser(const ValidSchema &schema, InputStreamer &in) :
+ Parser(const ValidSchema &schema, const InputBuffer &in) :
reader_(schema, in)
{}
Modified: hadoop/avro/trunk/lang/c++/api/Reader.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/Reader.hh?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/Reader.hh (original)
+++ hadoop/avro/trunk/lang/c++/api/Reader.hh Thu Apr 8 16:48:19 2010
@@ -23,9 +23,9 @@
#include <vector>
#include <boost/noncopyable.hpp>
-#include "InputStreamer.hh"
#include "Zigzag.hh"
#include "Types.hh"
+#include "buffer/BufferReader.hh"
namespace avro {
@@ -39,15 +39,15 @@ class Reader : private boost::noncopyabl
public:
- explicit Reader(InputStreamer &in) :
- in_(in)
+ explicit Reader(const InputBuffer &buffer) :
+ reader_(buffer)
{}
void readValue(Null &) {}
void readValue(bool &val) {
uint8_t ival;
- in_.readByte(ival);
+ reader_.read(ival);
val = (ival != 0);
}
@@ -66,7 +66,7 @@ class Reader : private boost::noncopyabl
float f;
uint32_t i;
} v;
- in_.readWord(v.i);
+ reader_.read(v.i);
val = v.f;
}
@@ -75,19 +75,13 @@ class Reader : private boost::noncopyabl
double d;
uint64_t i;
} v;
- in_.readLongWord(v.i);
+ reader_.read(v.i);
val = v.d;
}
void readValue(std::string &val) {
int64_t size = readSize();
- val.clear();
- val.reserve(size);
- uint8_t bval;
- for(size_t bytes = 0; bytes < static_cast<size_t>(size); bytes++) {
- in_.readByte(bval);
- val.push_back(bval);
- }
+ reader_.read(val, size);
}
void readBytes(std::vector<uint8_t> &val) {
@@ -96,15 +90,13 @@ class Reader : private boost::noncopyabl
val.reserve(size);
uint8_t bval;
for(size_t bytes = 0; bytes < static_cast<size_t>(size); bytes++) {
- in_.readByte(bval);
+ reader_.read(bval);
val.push_back(bval);
}
}
void readFixed(uint8_t *val, size_t size) {
- for(size_t bytes = 0; bytes < size; bytes++) {
- in_.readByte(val[bytes]);
- }
+ reader_.read(reinterpret_cast<char *>(val), size);
}
template <size_t N>
@@ -148,7 +140,7 @@ class Reader : private boost::noncopyabl
uint8_t val = 0;
int shift = 0;
do {
- in_.readByte(val);
+ reader_.read(val);
uint64_t newbits = static_cast<uint64_t>(val & 0x7f) << shift;
encoded |= newbits;
shift += 7;
@@ -157,7 +149,7 @@ class Reader : private boost::noncopyabl
return encoded;
}
- InputStreamer &in_;
+ BufferReader reader_;
};
Modified: hadoop/avro/trunk/lang/c++/api/ResolvingReader.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/ResolvingReader.hh?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/ResolvingReader.hh (original)
+++ hadoop/avro/trunk/lang/c++/api/ResolvingReader.hh Thu Apr 8 16:48:19 2010
@@ -28,14 +28,12 @@
namespace avro {
-class InputStreamer;
-
class ResolvingReader : private boost::noncopyable
{
public:
- ResolvingReader(const ResolverSchema &schema, InputStreamer &in) :
+ ResolvingReader(const ResolverSchema &schema, const InputBuffer &in) :
reader_(in),
schema_(schema)
{}
Modified: hadoop/avro/trunk/lang/c++/api/Serializer.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/Serializer.hh?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/Serializer.hh (original)
+++ hadoop/avro/trunk/lang/c++/api/Serializer.hh Thu Apr 8 16:48:19 2010
@@ -36,13 +36,13 @@ class Serializer : private boost::noncop
public:
/// Constructor only works with Writer
- explicit Serializer(OutputStreamer &out) :
- writer_(out)
+ explicit Serializer() :
+ writer_()
{}
/// Constructor only works with ValidatingWriter
- Serializer(const ValidSchema &schema, OutputStreamer &out) :
- writer_(schema, out)
+ Serializer(const ValidSchema &schema) :
+ writer_(schema)
{}
void writeNull() {
@@ -115,6 +115,10 @@ class Serializer : private boost::noncop
writer_.writeEnum(choice);
}
+ InputBuffer buffer() const {
+ return writer_.buffer();
+ }
+
private:
Writer writer_;
Modified: hadoop/avro/trunk/lang/c++/api/ValidatingReader.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/ValidatingReader.hh?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/ValidatingReader.hh (original)
+++ hadoop/avro/trunk/lang/c++/api/ValidatingReader.hh Thu Apr 8 16:48:19 2010
@@ -30,7 +30,6 @@
namespace avro {
class ValidSchema;
-class InputStreamer;
/// As an avro object is being parsed from binary data to its C++
/// representation, this parser will walk the parse tree and ensure that the
@@ -47,7 +46,7 @@ class ValidatingReader : private boost::
public:
- ValidatingReader(const ValidSchema &schema, InputStreamer &in);
+ ValidatingReader(const ValidSchema &schema, const InputBuffer &in);
template<typename T>
void readValue(T &val) {
Modified: hadoop/avro/trunk/lang/c++/api/ValidatingWriter.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/ValidatingWriter.hh?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/ValidatingWriter.hh (original)
+++ hadoop/avro/trunk/lang/c++/api/ValidatingWriter.hh Thu Apr 8 16:48:19 2010
@@ -28,7 +28,6 @@
namespace avro {
class ValidSchema;
-class OutputStreamer;
/// This class walks the parse tree as data is being serialized, and throws if
/// attempt to serialize a data type does not match the type expected in the
@@ -39,7 +38,7 @@ class ValidatingWriter : private boost::
public:
- ValidatingWriter(const ValidSchema &schema, OutputStreamer &out);
+ ValidatingWriter(const ValidSchema &schema);
template<typename T>
void writeValue(T val) {
@@ -84,6 +83,10 @@ class ValidatingWriter : private boost::
void writeEnum(int64_t choice);
+ InputBuffer buffer() const {
+ return writer_.buffer();
+ }
+
private:
void writeCount(int64_t count);
Modified: hadoop/avro/trunk/lang/c++/api/Validator.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/Validator.hh?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/Validator.hh (original)
+++ hadoop/avro/trunk/lang/c++/api/Validator.hh Thu Apr 8 16:48:19 2010
@@ -28,7 +28,6 @@
namespace avro {
-class OutputStreamer;
/// This class is used by both the ValidatingSerializer and ValidationParser
/// objects. It advances the parse tree (containing logic how to advance
Modified: hadoop/avro/trunk/lang/c++/api/Writer.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/Writer.hh?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/Writer.hh (original)
+++ hadoop/avro/trunk/lang/c++/api/Writer.hh Thu Apr 8 16:48:19 2010
@@ -21,7 +21,7 @@
#include <boost/noncopyable.hpp>
-#include "OutputStreamer.hh"
+#include "buffer/Buffer.hh"
#include "Zigzag.hh"
#include "Types.hh"
@@ -34,27 +34,25 @@ class Writer : private boost::noncopyabl
public:
- explicit Writer(OutputStreamer &out) :
- out_(out)
- {}
+ Writer() {}
void writeValue(const Null &) {}
void writeValue(bool val) {
int8_t byte = (val != 0);
- out_.writeByte(byte);
+ buffer_.writeTo(byte);
}
void writeValue(int32_t val) {
boost::array<uint8_t, 5> bytes;
size_t size = encodeInt32(val, bytes);
- out_.writeBytes(bytes.data(), size);
+ buffer_.writeTo(reinterpret_cast<const char *>(bytes.data()), size);
}
void writeValue(int64_t val) {
boost::array<uint8_t, 10> bytes;
size_t size = encodeInt64(val, bytes);
- out_.writeBytes(bytes.data(), size);
+ buffer_.writeTo(reinterpret_cast<const char *>(bytes.data()), size);
}
void writeValue(float val) {
@@ -64,7 +62,7 @@ class Writer : private boost::noncopyabl
} v;
v.f = val;
- out_.writeWord(v.i);
+ buffer_.writeTo(v.i);
}
void writeValue(double val) {
@@ -74,7 +72,7 @@ class Writer : private boost::noncopyabl
} v;
v.d = val;
- out_.writeLongWord(v.i);
+ buffer_.writeTo(v.i);
}
void writeValue(const std::string &val) {
@@ -83,17 +81,17 @@ class Writer : private boost::noncopyabl
void writeBytes(const void *val, size_t size) {
this->writeValue(static_cast<int64_t>(size));
- out_.writeBytes(val, size);
+ buffer_.writeTo(reinterpret_cast<const char *>(val), size);
}
template <size_t N>
void writeFixed(const uint8_t (&val)[N]) {
- out_.writeBytes(val, N);
+ buffer_.writeTo(reinterpret_cast<const char *>(val), N);
}
template <size_t N>
void writeFixed(const boost::array<uint8_t, N> &val) {
- out_.writeBytes(val.data(), val.size());
+ buffer_.writeTo(reinterpret_cast<const char *>(val.data()), val.size());
}
void writeRecord() {}
@@ -103,7 +101,7 @@ class Writer : private boost::noncopyabl
}
void writeArrayEnd() {
- out_.writeByte(0);
+ buffer_.writeTo<uint8_t>(0);
}
void writeMapBlock(int64_t size) {
@@ -111,7 +109,7 @@ class Writer : private boost::noncopyabl
}
void writeMapEnd() {
- out_.writeByte(0);
+ buffer_.writeTo<uint8_t>(0);
}
void writeUnion(int64_t choice) {
@@ -122,9 +120,13 @@ class Writer : private boost::noncopyabl
this->writeValue(static_cast<int64_t>(choice));
}
+ InputBuffer buffer() const {
+ return buffer_;
+ }
+
private:
- OutputStreamer &out_;
+ OutputBuffer buffer_;
};
Added: hadoop/avro/trunk/lang/c++/api/buffer/Buffer.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/buffer/Buffer.hh?rev=931999&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/buffer/Buffer.hh (added)
+++ hadoop/avro/trunk/lang/c++/api/buffer/Buffer.hh Thu Apr 8 16:48:19 2010
@@ -0,0 +1,503 @@
+#ifndef avro_Buffer_hh__
+#define avro_Buffer_hh__
+
+#include <boost/type_traits.hpp>
+#include <vector>
+
+#include "detail/BufferDetail.hh"
+#include "detail/BufferDetailIterator.hh"
+
+/**
+ * \file Buffer.hh
+ *
+ * \brief Definitions for InputBuffer and OutputBuffer classes
+ *
+ **/
+
+namespace avro {
+
+class OutputBuffer;
+class InputBuffer;
+
+
+/**
+ * The OutputBuffer (write-only buffer)
+ *
+ * Use cases for OutputBuffer
+ *
+ * - write message to buffer using ostream class or directly
+ * - append messages to headers
+ * - building up streams of messages via append
+ * - converting to read-only buffers for sending
+ * - extracting parts of the messages into read-only buffers
+ *
+ * -# ASIO access:
+ * - write to a buffer(s) by asio using iterator
+ * - convert to read buffer for deserializing
+ *
+ * OutputBuffer is assignable and copy-constructable. On copy or assignment,
+ * only a pointer is copied, so the two resulting copies are identical, so
+ * modifying one will modify both.
+ **/
+
+class OutputBuffer
+{
+
+ public:
+
+ typedef detail::size_type size_type;
+ typedef detail::data_type data_type;
+
+ /**
+ * The asio library expects a const_iterator (the const-ness refers to the
+ * fact that the underlying avro of buffers will not be modified, even
+ * though the data in those buffers is being modified). The iterator
+ * provides the list of addresses an operation can write to.
+ **/
+
+ typedef detail::OutputBufferIterator const_iterator;
+
+ /**
+ * Default constructor. Will pre-allocate at least the requested size, but
+ * can grow larger on demand.
+ *
+ * Destructor uses the default, which resets a shared pointer, deleting the
+ * underlying data if no other copies of exist.
+ *
+ * Copy and assignment operators are not explicitly provided because the
+ * default ones work fine. The default makes only a shallow copy, so the
+ * copies will refer to the same memory. This is required by asio
+ * functions, which will implicitly make copies for asynchronous
+ * operations. Therefore, the user must be careful that if they create
+ * multiple copies of the same OutputBuffer, only one is being modified
+ * otherwise undefined behavior may occur.
+ *
+ **/
+
+ OutputBuffer(size_type reserveSize = 0) :
+ pimpl_(new detail::BufferImpl)
+ {
+ if(reserveSize) {
+ reserve(reserveSize);
+ }
+ }
+
+ /**
+ * Reserve enough space for a wroteTo() operation. When using writeTo(),
+ * the buffer will grow dynamically as needed. But when using the iterator
+ * to write (followed by wroteTo()), data may only be written to the space
+ * available, so this ensures there is enough room in the buffer before
+ * the write operation.
+ **/
+
+ void reserve(size_type reserveSize)
+ {
+ pimpl_->reserveFreeSpace(reserveSize);
+ }
+
+ /**
+ * Write a block of data to the buffer. The buffer size will automatically
+ * grow if the size is larger than what is currently free.
+ **/
+
+ size_type writeTo(const data_type *data, size_type size) {
+ return pimpl_->writeTo(data, size);
+ }
+
+ /**
+ * Write a single value to the buffer. The buffer size will automatically
+ * grow if there is not room for the byte. The value must be a
+ * "fundamental" type, e.g. int, float, etc. (otherwise use the other
+ * writeTo tests).
+ **/
+
+ template<typename T>
+ void writeTo(T val) {
+ pimpl_->writeTo(val, boost::is_fundamental<T>());
+ }
+
+ /**
+ * Update the state of the buffer after writing through the iterator
+ * interface. This function exists primarily for the boost:asio which
+ * writes directly to the buffer using its iterator. In this case, the
+ * internal state of the buffer does not reflect that the data was written
+ * This informs the buffer how much data was written.
+ *
+ * The buffer does not automatically resize in this case, the bytes written
+ * cannot exceed the amount of free space. Attempting to write more will
+ * throw a std::length_error exception.
+ **/
+
+ size_type wroteTo(size_type size)
+ {
+ int wrote = 0;
+ if(size) {
+ if(size > freeSpace()) {
+ throw std::length_error("Impossible to write more data than free space");
+ }
+ wrote = pimpl_->wroteTo(size);
+ }
+ return wrote;
+ }
+
+ /**
+ * Does the buffer have any data?
+ **/
+
+ bool empty() const {
+ return (pimpl_->size()==0);
+ }
+
+ /**
+ * Returns the size of the buffer, in bytes.
+ */
+
+ size_type size() const {
+ return pimpl_->size();
+ }
+
+ /**
+ * Returns the current free space that is available to write to in the
+ * buffer, in bytes. This is not a strict limit in size, as writeTo() can
+ * automatically increase capacity if necessary.
+ **/
+
+ size_type freeSpace() const {
+ return pimpl_->freeSpace();
+ }
+
+ /**
+ * Appends the data in the argument to the end of this buffer. The
+ * argument can be either an InputBuffer or OutputBuffer.
+ *
+ **/
+
+ template <class BufferType>
+ void append(const BufferType &buf) {
+ // don't append an empty buffer
+ if(buf.size()) {
+ pimpl_->append(*(buf.pimpl_.get()));
+ }
+ }
+
+ /**
+ * Return an iterator pointing to the first data chunk of this buffer
+ * that may be written to.
+ **/
+
+ const_iterator begin() const {
+ return const_iterator(pimpl_->beginWrite());
+ }
+
+ /**
+ * Return the end iterator for writing.
+ **/
+
+ const_iterator end() const {
+ return const_iterator(pimpl_->endWrite());
+ }
+
+ /**
+ * Discard any data in this buffer.
+ **/
+
+ void discardData()
+ {
+ pimpl_->discardData();
+ }
+
+ /**
+ * Discard the specified number of bytes from this data, starting at the beginning.
+ * Throws if the size is greater than the number of bytes.
+ **/
+
+ void discardData(size_t bytes)
+ {
+ if(bytes > 0) {
+ if(bytes < pimpl_->size()) {
+ pimpl_->discardData(bytes);
+ }
+ else if(bytes == pimpl_->size()) {
+ pimpl_->discardData();
+ }
+ else {
+ throw std::out_of_range("trying to discard more data than exists");
+ }
+ }
+ }
+
+ /**
+ * Remove bytes from this buffer, starting from the beginning, and place
+ * them into a new buffer. Throws if the number of requested bytes exceeds
+ * the size of the buffer. Data and freeSpace in the buffer after bytes
+ * remains in this buffer.
+ **/
+
+ InputBuffer extractData(size_type bytes);
+
+ /**
+ * Remove all bytes from this buffer, returning them in a new buffer.
+ * After removing data, some freeSpace may remain in this buffer.
+ **/
+
+ InputBuffer extractData();
+
+ /**
+ * Clone this buffer, creating a copy that contains the same data.
+ **/
+
+ OutputBuffer clone() const
+ {
+ detail::BufferImpl::SharedPtr newImpl(new detail::BufferImpl(*pimpl_));
+ return OutputBuffer(newImpl);
+ }
+
+ /**
+ * Add unmanaged data to the buffer. The buffer will not automatically
+ * free the data, but it will call the supplied function when the data is
+ * no longer referenced by the buffer (or copies of the buffer).
+ **/
+
+ void appendForeignData(const data_type *data, size_type size, const detail::free_func &func) {
+ pimpl_->appendForeignData(data, size, func);
+ }
+
+ /**
+ * Returns the number of chunks that contain free space.
+ **/
+
+ int numChunks() const {
+ return pimpl_->numFreeChunks();
+ }
+
+ /**
+ * Returns the number of chunks that contain data
+ **/
+
+ int numDataChunks() const {
+ return pimpl_->numDataChunks();
+ }
+
+ private:
+
+ friend class InputBuffer;
+ friend class BufferReader;
+
+ explicit OutputBuffer(const detail::BufferImpl::SharedPtr &pimpl) :
+ pimpl_(pimpl)
+ { }
+
+ detail::BufferImpl::SharedPtr pimpl_; ///< Must never be null.
+};
+
+/**
+ * The InputBuffer (read-only buffer)
+ *
+ * InputBuffer is an immutable buffer which that may be constructed from an
+ * OutputBuffer, or several of OutputBuffer's methods. Once the data is
+ * transfered to an InputBuffer it cannot be modified, only read (via
+ * BufferReader, istream, or its iterator).
+ *
+ * Assignments and copies are shallow copies.
+ *
+ * -# ASIO access: - iterate using const_iterator for sending messages
+ *
+ **/
+
+class InputBuffer
+{
+
+ public:
+
+ typedef detail::size_type size_type;
+ typedef detail::data_type data_type;
+
+ // needed for asio
+ typedef detail::InputBufferIterator const_iterator;
+
+ /**
+ * Default InputBuffer creates an empty buffer.
+ *
+ * Copy/assignment functions use the default ones. They will do a shallow
+ * copy, and because InputBuffer is immutable, the copies will be
+ * identical.
+ *
+ * Destructor also uses the default, which resets a shared pointer,
+ * deleting the underlying data if no other copies of exist.
+ **/
+
+ InputBuffer() :
+ pimpl_(new detail::BufferImpl)
+ { }
+
+ /**
+ * Construct an InputBuffer that contains the contents of an OutputBuffer.
+ * The two buffers will have the same contents, but this copy will be
+ * immutable, while the the OutputBuffer may still be written to.
+ *
+ * If you wish to move the data from the OutputBuffer to a new InputBuffer
+ * (leaving only free space in the OutputBuffer),
+ * OutputBuffer::extractData() will do this more efficiently.
+ *
+ * Implicit conversion is allowed.
+ **/
+
+ InputBuffer(const OutputBuffer &src) :
+ pimpl_(new detail::BufferImpl(*src.pimpl_))
+ { }
+
+ /**
+ * Does the buffer have any data?
+ **/
+
+ bool empty() const {
+ return (pimpl_->size() == 0);
+ }
+
+ /**
+ * Returns the size of the buffer, in bytes.
+ **/
+
+ size_type size() const {
+ return pimpl_->size();
+ }
+
+ /**
+ * Return an iterator pointing to the first data chunk of this buffer
+ * that contains data.
+ **/
+
+ const_iterator begin() const {
+ return const_iterator(pimpl_->beginRead());
+ }
+
+ /**
+ * Return the end iterator.
+ **/
+
+ const_iterator end() const {
+ return const_iterator(pimpl_->endRead());
+ }
+
+ /**
+ * Returns the number of chunks containing data.
+ **/
+
+ int numChunks() const {
+ return pimpl_->numDataChunks();
+ }
+
+
+ private:
+
+ friend class OutputBuffer; // for append function
+ friend class istreambuf;
+ friend class BufferReader;
+
+ explicit InputBuffer(const detail::BufferImpl::SharedPtr &pimpl) :
+ pimpl_(pimpl)
+ { }
+
+ /**
+ * Class to indicate that a copy of a OutputBuffer to InputBuffer should be
+ * a shallow copy, used to enable reading of the contents of an
+ * OutputBuffer without need to convert it to InputBuffer using a deep
+ * copy. It is private and only used by BufferReader and istreambuf
+ * classes.
+ *
+ * Writing to an OutputBuffer while it is being read may lead to undefined
+ * behavior.
+ **/
+
+ class ShallowCopy {};
+
+ /**
+ * Make a shallow copy of an OutputBuffer in order to read it without
+ * causing conversion overhead.
+ **/
+ InputBuffer(const OutputBuffer &src, const ShallowCopy &) :
+ pimpl_(src.pimpl_)
+ { }
+
+ /**
+ * Make a shallow copy of an InputBuffer. The default copy constructor
+ * already provides shallow copy, this is just provided for generic
+ * algorithms that wish to treat InputBuffer and OutputBuffer in the same
+ * manner.
+ **/
+
+ InputBuffer(const InputBuffer &src, const ShallowCopy &) :
+ pimpl_(src.pimpl_)
+ { }
+
+
+ detail::BufferImpl::ConstSharedPtr pimpl_; ///< Must never be null.
+};
+
+
+/*
+ * Implementations of some OutputBuffer functions are inlined here
+ * because InputBuffer definition was required before.
+ */
+
+inline InputBuffer OutputBuffer::extractData()
+{
+ detail::BufferImpl::SharedPtr newImpl(new detail::BufferImpl);
+ if(pimpl_->size()) {
+ pimpl_->extractData(*newImpl);
+ }
+ return InputBuffer(newImpl);
+}
+
+inline InputBuffer OutputBuffer::extractData(size_type bytes)
+{
+ if(bytes > pimpl_->size()) {
+ throw std::out_of_range("trying to extract more data than exists");
+ }
+
+ detail::BufferImpl::SharedPtr newImpl(new detail::BufferImpl);
+ if(bytes > 0) {
+ if(bytes < pimpl_->size()) {
+ pimpl_->extractData(*newImpl, bytes);
+ }
+ else {
+ pimpl_->extractData(*newImpl);
+ }
+ }
+
+ return InputBuffer(newImpl);
+}
+
+/**
+ * Create an array of iovec structures from the buffer. This utility is used
+ * to support writev and readv function calls. The caller should ensure the
+ * buffer object is not deleted while using the iovec vector.
+ *
+ * If the BufferType is an InputBuffer, the iovec will point to the data that
+ * already exists in the buffer, for reading.
+ *
+ * If the BufferType is an OutputBuffer, the iovec will point to the free
+ * space, which may be written to. Before writing, the caller should call
+ * OutputBuffer::reserve() to create enough room for the desired write (which
+ * can be verified by calling OutputBuffer::freeSpace()), and after writing,
+ * they MUST call OutputBuffer::wroteTo(), otherwise the buffer will not know
+ * the space is not free anymore.
+ *
+ **/
+
+template<class BufferType>
+inline void toIovec(BufferType &buf, std::vector<struct iovec> &iov)
+{
+ const int chunks = buf.numChunks();
+ iov.resize(chunks);
+ typename BufferType::const_iterator iter = buf.begin();
+ for (int i = 0; i < chunks; ++i) {
+ iov[i].iov_base = const_cast<typename BufferType::data_type *>(iter->data());
+ iov[i].iov_len = iter->size();
+ ++iter;
+ }
+}
+
+} // namespace
+
+#endif
Added: hadoop/avro/trunk/lang/c++/api/buffer/BufferPrint.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/buffer/BufferPrint.hh?rev=931999&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/buffer/BufferPrint.hh (added)
+++ hadoop/avro/trunk/lang/c++/api/buffer/BufferPrint.hh Thu Apr 8 16:48:19 2010
@@ -0,0 +1,100 @@
+#ifndef avro_BufferPrint_hh__
+#define avro_BufferPrint_hh__
+
+#include <ctype.h>
+#include <iostream>
+#include <iomanip>
+#include "BufferReader.hh"
+
+/**
+ * \file BufferPrint.hh
+ *
+ * \brief Convenience functions for printing buffer contents
+ **/
+
+namespace avro {
+
+namespace detail {
+
+/**
+ * \fn hexPrint
+ *
+ * Prints a buffer to a stream in the canonical hex+ASCII format,
+ * the same used by the program 'hexdump -C'
+ *
+ **/
+
+inline void
+hexPrint(std::ostream &os, BufferReader &reader)
+{
+ std::ios_base::fmtflags savedFlags = os.flags();
+
+ char sixteenBytes[16];
+ int offset = 0;
+
+ os << std::setfill('0');
+ os << std::hex;
+
+ while(reader.bytesRemaining()) {
+
+ os << std::setw(8) << offset << " ";
+
+ size_t inBuffer = reader.read(sixteenBytes, sizeof(sixteenBytes));
+ offset += inBuffer;
+
+ // traverse 8 bytes or inBuffer, whatever is less
+ size_t cnt = std::min(inBuffer, static_cast<size_t>(8));
+
+ size_t i = 0;
+ for (; i < cnt; ++i) {
+ os << std::setw(2);
+ os << (static_cast<int>(sixteenBytes[i]) & 0xff) << ' ';
+ }
+ for (; i < 8; ++i) {
+ os << " ";
+ }
+ os << ' ';
+
+ // traverse 16 bytes or inBuffer, whatever is less
+ cnt = std::min(inBuffer, static_cast<size_t>(16));
+
+ for (; i < cnt; ++i) {
+ os << std::setw(2);
+ os << (static_cast<int>(sixteenBytes[i]) & 0xff) << ' ';
+ }
+ for (; i < 16; ++i) {
+ os << " ";
+ }
+ os << " |";
+ for(i = 0; i < inBuffer; ++i) {
+ os.put(isprint(sixteenBytes[i]) ? sixteenBytes[i] : '.' );
+ }
+ os << "|\n";
+
+ }
+
+ // restore flags
+ os.flags( savedFlags);
+}
+
+} // namespace detail
+
+} // namespace
+
+inline
+std::ostream& operator<<(std::ostream& os, const avro::OutputBuffer& buffer)
+{
+ avro::BufferReader reader(buffer);
+ avro::detail::hexPrint(os, reader);
+ return os;
+}
+
+inline
+std::ostream& operator<<(std::ostream& os, const avro::InputBuffer& buffer)
+{
+ avro::BufferReader reader(buffer);
+ avro::detail::hexPrint(os, reader);
+ return os;
+}
+
+#endif
Added: hadoop/avro/trunk/lang/c++/api/buffer/BufferReader.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/buffer/BufferReader.hh?rev=931999&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/buffer/BufferReader.hh (added)
+++ hadoop/avro/trunk/lang/c++/api/buffer/BufferReader.hh Thu Apr 8 16:48:19 2010
@@ -0,0 +1,267 @@
+#ifndef avro_BufferReader_hh__
+#define avro_BufferReader_hh__
+
+#include "Buffer.hh"
+
+/**
+ * \file BufferReader.hh
+ *
+ * \brief Helper class for reading bytes from buffer in a streaming manner,
+ * without the overhead of istreams.
+ *
+ **/
+
+namespace avro {
+
+/**
+ * Helper class for reading bytes from buffer without worrying about
+ * chunk boundaries. May read from an InputBuffer or OutputBuffer.
+ *
+ **/
+class BufferReader : private boost::noncopyable
+{
+
+ public:
+
+ typedef detail::data_type data_type;
+ typedef detail::size_type size_type;
+
+ private:
+
+ size_type chunkRemaining() const {
+ return iter_->dataSize() - chunkPos_;
+ }
+
+ void incrementChunk(size_type howmuch) {
+ bytesRemaining_ -= howmuch;
+ chunkPos_ += howmuch;
+ if(chunkPos_ == iter_->dataSize()) {
+ chunkPos_ = 0;
+ ++iter_;
+ }
+ }
+
+ void rewind() {
+ iter_ = bufferImpl_->beginRead();
+ bytesRemaining_ = bytes_;
+ chunkPos_ = 0;
+ }
+
+ const data_type *addr() const {
+ return iter_->tellReadPos() + chunkPos_;
+ }
+
+ public:
+
+ BufferReader(const InputBuffer &buf) :
+ bufferImpl_(buf.pimpl_),
+ iter_(bufferImpl_->beginRead()),
+ bytes_(bufferImpl_->size()),
+ bytesRemaining_(bytes_),
+ chunkPos_(0)
+ { }
+
+ BufferReader(const OutputBuffer &buf) :
+ bufferImpl_(buf.pimpl_),
+ iter_(bufferImpl_->beginRead()),
+ bytes_(bufferImpl_->size()),
+ bytesRemaining_(bytes_),
+ chunkPos_(0)
+ { }
+
+ /**
+ * How many bytes are still not read from this buffer.
+ **/
+
+ size_type bytesRemaining() const {
+ return bytesRemaining_;
+ }
+
+ /**
+ * Read a block of data from the front of the buffer.
+ **/
+
+ size_type bytesRead() const {
+ return bytes_ - bytesRemaining_;
+ }
+
+ /**
+ * Read a block of data from the buffer.
+ **/
+
+ size_type read(data_type *data, size_type size) {
+
+ if(size > bytesRemaining_) {
+ size = bytesRemaining_;
+ }
+ size_type sizeToRead = size;
+
+ while(sizeToRead) {
+ const size_type toRead = std::min(sizeToRead, chunkRemaining());
+ memcpy(data, addr(), toRead);
+ sizeToRead -= toRead;
+ data += toRead;
+ incrementChunk(toRead);
+ }
+
+ return size;
+ }
+
+ /**
+ * Read a block of data from the buffer.
+ **/
+
+ bool read(std::string &str, size_type size) {
+ if(size > bytesRemaining_) {
+ return false;
+ }
+
+ if(size <= chunkRemaining()) {
+ fastStringRead(str, size);
+ }
+ else {
+ slowStringRead(str, size);
+ }
+
+ return true;
+ }
+
+
+ /**
+ * Read a single value from the buffer. The value must be a "fundamental"
+ * type, e.g. int, float, etc. (otherwise use the other writeTo tests).
+ *
+ **/
+
+ template<typename T>
+ bool read(T &val) {
+ return read(val, boost::is_fundamental<T>());
+ }
+
+ /**
+ * Skips a block of data from the buffer.
+ **/
+
+ bool skip(size_type bytes) {
+ bool skipped = false;
+ if(bytes <= bytesRemaining_) {
+ doSkip(bytes);
+ skipped = true;
+ }
+ return skipped;
+ }
+
+ /**
+ * Seek to a position in the buffer.
+ **/
+
+ bool seek(size_type pos) {
+ if(pos > bytes_) {
+ return false;
+ }
+
+ size_type toSkip = pos;
+ size_type curPos = bytesRead();
+ // if the seek position is ahead, we can use skip to get there
+ if(pos >= curPos) {
+ toSkip -= curPos;
+ }
+ // if the seek position is ahead of the start of the chunk we can back up to
+ // start of the chunk
+ else if(pos >= (curPos - chunkPos_)) {
+ curPos -= chunkPos_;
+ bytesRemaining_ += chunkPos_;
+ chunkPos_ = 0;
+ toSkip -= curPos;
+ }
+ else {
+ rewind();
+ }
+ doSkip(toSkip);
+ return true;
+ }
+
+ bool peek(char &val) {
+ bool ret = (bytesRemaining_ > 0);
+ if(ret) {
+ val = *(addr());
+ }
+ return ret;
+ }
+
+ InputBuffer copyData(size_type bytes) {
+ if(bytes > bytesRemaining_) {
+ // force no copy
+ bytes = 0;
+ }
+ detail::BufferImpl::SharedPtr newImpl(new detail::BufferImpl);
+ if(bytes) {
+ bufferImpl_->copyData(*newImpl, iter_, chunkPos_, bytes);
+ doSkip(bytes);
+ }
+ return InputBuffer(newImpl);
+ }
+
+ private:
+
+ void doSkip(size_type sizeToSkip) {
+
+ while(sizeToSkip) {
+ const size_type toSkip = std::min(sizeToSkip, chunkRemaining());
+ sizeToSkip -= toSkip;
+ incrementChunk(toSkip);
+ }
+ }
+
+ template<typename T>
+ bool read(T &val, const boost::true_type&)
+ {
+ if(sizeof(T) > bytesRemaining_) {
+ return false;
+ }
+
+ if (sizeof(T) <= chunkRemaining()) {
+ val = *(reinterpret_cast<const T*> (addr()));
+ incrementChunk(sizeof(T));
+ }
+ else {
+ read(reinterpret_cast<data_type *>(&val), sizeof(T));
+ }
+ return true;
+ }
+
+ /// An uninstantiable function, this is if boost::is_fundamental check fails
+ template<typename T>
+ bool read(T &val, const boost::false_type&)
+ {
+ BOOST_STATIC_ASSERT(sizeof(T)==0);
+ return false;
+ }
+
+ void fastStringRead(std::string &str, size_type sizeToCopy) {
+ str.assign(addr(), sizeToCopy);
+ incrementChunk(sizeToCopy);
+ }
+
+ void slowStringRead(std::string &str, size_type sizeToCopy) {
+ str.clear();
+ str.reserve(sizeToCopy);
+ while(sizeToCopy) {
+ const size_type toCopy = std::min(sizeToCopy, chunkRemaining());
+ str.append(addr(), toCopy);
+ sizeToCopy -= toCopy;
+ incrementChunk(toCopy);
+ }
+ }
+
+ detail::BufferImpl::ConstSharedPtr bufferImpl_;
+ detail::BufferImpl::ChunkList::const_iterator iter_;
+ size_type bytes_;
+ size_type bytesRemaining_;
+ size_type chunkPos_;
+};
+
+
+} // namespace
+
+#endif
Added: hadoop/avro/trunk/lang/c++/api/buffer/BufferStream.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/buffer/BufferStream.hh?rev=931999&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/buffer/BufferStream.hh (added)
+++ hadoop/avro/trunk/lang/c++/api/buffer/BufferStream.hh Thu Apr 8 16:48:19 2010
@@ -0,0 +1,83 @@
+#ifndef avro_BufferStream_hh__
+#define avro_BufferStream_hh__
+
+#include "BufferStreambuf.hh"
+
+/**
+ * \file BufferStream.hh
+ *
+ * \brief Custom istream and ostream classes for use with buffers
+ **/
+
+namespace avro {
+
+/**
+ *
+ * \brief Custom ostream class for writing to an OutputBuffer
+ *
+ **/
+
+class ostream : public std::ostream {
+
+ public:
+
+ /// Default constructor, creates a new OutputBuffer.
+ ostream() :
+ std::ostream(&obuf_)
+ { }
+
+ /// Output to a specific buffer.
+ ostream(OutputBuffer &buf) :
+ std::ostream(&obuf_),
+ obuf_(buf)
+ { }
+
+ /// Return the output buffer created by the write operations to this ostream.
+ const OutputBuffer &getBuffer() const {
+ return obuf_.getBuffer();
+ }
+
+ protected:
+
+ ostreambuf obuf_;
+};
+
+/**
+ * \brief Custom istream class for reading from an InputBuffer.
+ *
+ * If the buffer contains binary data, then it is recommended to only use the
+ * read() and readsome() functions--get() or getline() may be confused if the
+ * binary data happens to contain an EOF character.
+ *
+ * For buffers containing text, the full implementation of istream is safe.
+ *
+ **/
+
+class istream : public std::istream {
+
+ public:
+
+ /// Constructor, requires an InputBuffer to read from.
+ explicit istream(const InputBuffer &buf) :
+ std::istream(&ibuf_), ibuf_(buf)
+ { }
+
+ /// Constructor, takes an OutputBuffer to read from (by making a shallow copy to an InputBuffer).
+ /// Writing to the OutputBuffer while an istream is using it may lead to undefined behavior.
+ explicit istream(const OutputBuffer &buf) :
+ std::istream(&ibuf_), ibuf_(buf)
+ { }
+
+ /// Return the InputBuffer this stream is reading from.
+ const InputBuffer &getBuffer() const {
+ return ibuf_.getBuffer();
+ }
+
+ protected:
+
+ istreambuf ibuf_;
+};
+
+} // namespace avro
+
+#endif
Added: hadoop/avro/trunk/lang/c++/api/buffer/BufferStreambuf.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/buffer/BufferStreambuf.hh?rev=931999&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/buffer/BufferStreambuf.hh (added)
+++ hadoop/avro/trunk/lang/c++/api/buffer/BufferStreambuf.hh Thu Apr 8 16:48:19 2010
@@ -0,0 +1,234 @@
+#ifndef avro_BufferStreambuf_hh__
+#define avro_BufferStreambuf_HH__
+
+#include "Buffer.hh"
+
+/** \file BufferStreambuf.hh
+ \brief streambuf implementation for istream and ostream.
+*/
+
+namespace avro {
+
+/**
+ * \brief Implementation of streambuf for use by the Buffer's ostream.
+ *
+ * This class derives from std::streambuf and implements the virtual functions
+ * needed to operate on OutputBuffer. The override functions are overflow and
+ * xsputn. Typically custom streambufs will also override sync for output,
+ * but we have no need since all writes are immediately stored in the buffer.
+ **/
+
+class ostreambuf : public std::streambuf {
+
+ public:
+
+ /// Default constructor creates a new OutputBuffer.
+ ostreambuf() :
+ std::streambuf(),
+ buffer_()
+ { }
+
+ /// Construct using an existing OutputBuffer.
+ explicit ostreambuf(OutputBuffer &buffer) :
+ std::streambuf(),
+ buffer_( buffer )
+ { }
+
+ /// Return the buffer.
+ const OutputBuffer &getBuffer() const {
+ return buffer_;
+ }
+
+ protected:
+
+ /// Write a single character to the stream.
+ virtual int_type overflow(int_type c)
+ {
+ buffer_.writeTo(static_cast<OutputBuffer::data_type>(c));
+ return c;
+ }
+
+ /// Write a block of characters to the stream.
+ virtual std::streamsize xsputn(const char_type *s, std::streamsize n)
+ {
+ return buffer_.writeTo(s, n);
+ }
+
+ private:
+
+ OutputBuffer buffer_;
+};
+
+/**
+ * \brief Implementation of streambuf for use by the Buffer's istream.
+ *
+ * This class derives from std::streambuf and implements the virtual functions
+ * needed to operate on InputBuffer. The override functions are underflow,
+ * seekpos, showmanyc, and seek. This is considered a buffered streambuf,
+ * because it can access a chunk of the InputBuffer at a time, using the
+ * iterator interface. Because the input is already buffered, uflow is not
+ * required. pbackfail is not yet implemented but can be if necessary (the
+ * inherited behavior is to fail, and has yet to be a problem).
+ *
+ **/
+
+class istreambuf : public std::streambuf {
+
+ public:
+
+ /// Default constructor requires an InputBuffer to read from.
+ explicit istreambuf(const InputBuffer &buffer) :
+ std::streambuf(),
+ buffer_( buffer ),
+ basePos_(0),
+ iter_(buffer_.begin())
+ {
+ setBuffer();
+ }
+
+ /// Default constructor converts an OutputBuffer to an InputBuffer
+ explicit istreambuf(const OutputBuffer &buffer) :
+ std::streambuf(),
+ buffer_( buffer, InputBuffer::ShallowCopy()),
+ basePos_(0),
+ iter_(buffer_.begin())
+ {
+ setBuffer();
+ }
+
+ /// Return the buffer.
+ const InputBuffer &getBuffer() const {
+ return buffer_;
+ }
+
+ protected:
+
+ /// The current chunk of data is exhausted, read the next chunk.
+ virtual int_type underflow() {
+ if(iter_ != buffer_.end()) {
+ basePos_ += (egptr()-eback());
+ ++iter_;
+ }
+ return setBuffer();
+ }
+
+ /// Get a block of data from the stream. Overrides default behavior
+ /// to ignore eof characters that may reside in the stream.
+ virtual std::streamsize xsgetn(char_type *c, std::streamsize len)
+ {
+ std::streamsize bytesCopied = 0;
+
+ while (bytesCopied < len) {
+
+ size_t inBuffer = egptr() - gptr();
+
+ if (inBuffer) {
+ size_t remaining = len - bytesCopied;
+ size_t toCopy = std::min(inBuffer, remaining);
+ memcpy(c, gptr(), toCopy);
+ c += toCopy;
+ bytesCopied += toCopy;
+ gbump(toCopy);
+ }
+
+ if(bytesCopied < len) {
+ underflow();
+ if(iter_ == buffer_.end()) {
+ break;
+ }
+ }
+ }
+
+ return bytesCopied;
+ }
+
+ /// Special seek override to navigate InputBuffer chunks.
+ virtual pos_type seekoff(off_type off, std::ios::seekdir dir, std::_Ios_Openmode) {
+
+ off_type curpos = basePos_ + (gptr() - eback());
+ off_type newpos = off;
+
+ if(dir == std::ios::cur) {
+ newpos += curpos;
+ }
+ else if (dir == std::ios::end) {
+ newpos += buffer_.size();
+ }
+ // short circuit for tell()
+ if(newpos == curpos) {
+ return curpos;
+ }
+
+ off_type endpos = basePos_ + (egptr() - eback());
+
+ // if the position is after our current buffer make
+ // sure it's not past the end of the buffer
+ if((newpos > endpos) && (newpos > static_cast<off_type>(buffer_.size()) )) {
+ return pos_type(-1);
+ }
+ // if the new position is before our current iterator
+ // reset the iterator to the beginning
+ else if (newpos < basePos_) {
+ iter_ = buffer_.begin();
+ basePos_ = 0;
+ setBuffer();
+ endpos = (egptr() -eback());
+ }
+
+ // now if the new position is after the end of the buffer
+ // increase the buffer until it is not
+ while (newpos > endpos) {
+ istreambuf::underflow();
+ endpos = basePos_ + (egptr() - eback());
+ }
+
+ setg(eback(), eback() + (newpos - basePos_), egptr());
+ return newpos;
+ }
+
+ /// Calls seekoff for implemention.
+ virtual pos_type seekpos(pos_type pos, std::_Ios_Openmode) {
+ return istreambuf::seekoff(pos, std::ios::beg, std::_Ios_Openmode(0));
+ }
+
+ /// Shows the number of bytes buffered in the current chunk, or next chunk if
+ /// current is exhausted.
+ virtual std::streamsize showmanyc() {
+
+ // this function only gets called when the current buffer has been
+ // completely read, verify this is the case, and if so, underflow to
+ // fetch the next buffer
+
+ if(egptr() - gptr() == 0) {
+ istreambuf::underflow();
+ }
+ return egptr() - gptr();
+ }
+
+ private:
+
+ /// Setup the streambuf buffer pointers after updating
+ /// the value of the iterator. Returns the first character
+ /// in the new buffer, or eof if there is no buffer.
+ int_type setBuffer() {
+ int_type ret = traits_type::eof();
+
+ if(iter_ != buffer_.end()) {
+ char *loc = const_cast <char *> (iter_->data()) ;
+ setg(loc, loc, loc + iter_->size());
+ ret = *gptr();
+ }
+ else {
+ setg(0,0,0);
+ }
+ return ret;
+ }
+
+ const InputBuffer buffer_;
+ off_type basePos_;
+ InputBuffer::const_iterator iter_;
+};
+
+} // namespace
+
+#endif
Added: hadoop/avro/trunk/lang/c++/api/buffer/detail/BufferDetail.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/buffer/detail/BufferDetail.hh?rev=931999&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/buffer/detail/BufferDetail.hh (added)
+++ hadoop/avro/trunk/lang/c++/api/buffer/detail/BufferDetail.hh Thu Apr 8 16:48:19 2010
@@ -0,0 +1,532 @@
+#ifndef avro_BufferDetail_hh__
+#define avro_BufferDetail_hh__
+
+#include <boost/shared_ptr.hpp>
+#include <boost/shared_array.hpp>
+#include <boost/static_assert.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/function.hpp>
+#include <exception>
+#include <cassert>
+#include <deque>
+
+/**
+ * \file BufferDetail.hh
+ *
+ * \brief The implementation details for the Buffer class.
+ *
+ **/
+
+namespace avro {
+
+namespace detail {
+
+typedef char data_type;
+typedef size_t size_type;
+typedef boost::asio::const_buffer ConstAsioBuffer;
+typedef boost::asio::mutable_buffer MutableAsioBuffer;
+
+/// The size in bytes for blocks backing buffer chunks.
+const size_type kMinBlockSize = 4096;
+const size_type kMaxBlockSize = 16384;
+const size_type kDefaultBlockSize = kMinBlockSize;
+
+typedef boost::function<void(void)> free_func;
+
+/**
+ * Simple class to hold a functor that executes on delete
+ **/
+class CallOnDestroy {
+ public:
+ CallOnDestroy(const free_func &func) : func_(func)
+ { }
+ ~CallOnDestroy() {
+ if (func_) {
+ func_();
+ }
+ }
+ private:
+ free_func func_;
+};
+
+/**
+ * \brief A chunk is the building block for buffers.
+ *
+ * A chunk is backed by a memory block, and internally it maintains information
+ * about which area of the block it may use, and the portion of this area that
+ * contains valid data. More than one chunk may share the same underlying
+ * block, but the areas should never overlap. Chunk holds a shared pointer to
+ * an array of bytes so that shared blocks are reference counted.
+ *
+ * When a chunk is copied, the copy shares the same underlying buffer, but the
+ * copy receives its own copies of the start/cursor/end pointers, so each copy
+ * can be manipulated independently. This allows different buffers to share
+ * the same non-overlapping parts of a chunk, or even overlapping parts of a
+ * chunk if the situation arises.
+ *
+ **/
+
+class Chunk
+{
+
+ public:
+
+ typedef boost::shared_ptr<Chunk> SharedPtr;
+
+ /// Default constructor, allocates a new underlying block for this chunk.
+ Chunk(size_type size) :
+ underlyingBlock_(new data_type[size]),
+ readPos_(underlyingBlock_.get()),
+ writePos_(readPos_),
+ endPos_(readPos_ + size)
+ { }
+
+ /// Foreign buffer constructor, uses the supplied data for this chunk, and
+ /// only for reading.
+ Chunk(const data_type *data, size_type size, const free_func &func) :
+ callOnDestroy_(new CallOnDestroy(func)),
+ readPos_(const_cast<data_type *>(data)),
+ writePos_(readPos_ + size),
+ endPos_(writePos_)
+ { }
+
+ private:
+ // reference counted object will call a functor when it's destroyed
+ boost::shared_ptr<CallOnDestroy> callOnDestroy_;
+
+ public:
+
+ /// Remove readable bytes from the front of the chunk by advancing the
+ /// chunk start position.
+ void truncateFront(size_type howMuch) {
+ readPos_ += howMuch;
+ assert(readPos_ <= writePos_);
+ }
+
+ /// Remove readable bytes from the back of the chunk by moving the
+ /// chunk cursor position.
+ void truncateBack(size_type howMuch) {
+ writePos_ -= howMuch;
+ assert(readPos_ <= writePos_);
+ }
+
+ /// Tell the position the next byte may be written to.
+ data_type *tellWritePos() const {
+ return writePos_;
+ }
+
+ /// Tell the position of the first byte containing valid data.
+ const data_type *tellReadPos() const {
+ return readPos_;
+ }
+
+ /// After a write operation, increment the write position.
+ void incrementCursor(size_type howMuch) {
+ writePos_ += howMuch;
+ assert(writePos_ <= endPos_);
+ }
+
+ /// Tell how many bytes of data were written to this chunk.
+ size_type dataSize() const {
+ return (writePos_ - readPos_);
+ }
+
+ /// Tell how many bytes this chunk has available to write to.
+ size_type freeSize() const {
+ return (endPos_ - writePos_);
+ }
+
+ /// Tell how many bytes of data this chunk can hold (used and free).
+ size_type capacity() const {
+ return (endPos_ - readPos_);
+ }
+
+ private:
+
+ friend bool operator==(const Chunk &lhs, const Chunk &rhs);
+ friend bool operator!=(const Chunk &lhs, const Chunk &rhs);
+
+ // more than one buffer can share an underlying block, so use SharedPtr
+ boost::shared_array<data_type> underlyingBlock_;
+
+ data_type *readPos_; ///< The first readable byte in the block
+ data_type *writePos_; ///< The end of written data and start of free space
+ data_type *endPos_; ///< Marks the end of the usable block area
+};
+
+/**
+ * Compare underlying buffers and return true if they are equal
+ **/
+inline bool operator==(const Chunk &lhs, const Chunk &rhs) {
+ return lhs.underlyingBlock_ == rhs.underlyingBlock_;
+}
+
+/**
+ * Compare underlying buffers and return true if they are unequal
+ **/
+inline bool operator!=(const Chunk &lhs, const Chunk &rhs) {
+ return lhs.underlyingBlock_ != rhs.underlyingBlock_;
+}
+
+
+/**
+ * \brief Implementation details for Buffer class
+ *
+ * Internally, BufferImpl keeps two lists of chunks, one list consists entirely of
+ * chunks containing data, and one list which contains chunks with free space.
+ *
+ *
+ */
+
+class BufferImpl : boost::noncopyable
+{
+
+ /// Add a new chunk to the list of chunks for this buffer, growing the
+ /// buffer by the default block size.
+ void allocChunkChecked(size_type size = kDefaultBlockSize)
+ {
+ writeChunks_.push_back(Chunk(size));
+ freeSpace_ += writeChunks_.back().freeSize();
+ }
+
+ /// Add a new chunk to the list of chunks for this buffer, growing the
+ /// buffer by the requested size, but within the range of a minimum and
+ /// maximum.
+ void allocChunk(size_type size)
+ {
+ if(size < kMinBlockSize) {
+ size = kMinBlockSize;
+ }
+ else if (size > kMaxBlockSize) {
+ size = kMaxBlockSize;
+ }
+ allocChunkChecked(size);
+ }
+
+ /// Update the state of the chunks after a write operation. This function
+ /// ensures the chunk states are consistent with the write.
+ void postWrite(size_type size)
+ {
+
+ // precondition to this function is that the writeChunk_.front()
+ // contains the data that was just written, so make sure writeChunks_
+ // is not empty:
+
+ assert(size <= freeSpace_ && !writeChunks_.empty());
+
+ // This is probably the one tricky part of BufferImpl. The data that
+ // was written now exists in writeChunks_.front(). Now we must make
+ // sure that same data exists in readChunks_.back().
+ //
+ // There are two cases:
+ //
+ // 1. readChunks_.last() and writeChunk_.front() refer to the same
+ // underlying block, in which case they both just need their cursor
+ // updated to reflect the new state.
+ //
+ // 2. readChunk_.last() is not the same block as writeChunks_.front(),
+ // in which case it should be, since the writeChunk.front() contains
+ // the next bit of data that will be appended to readChunks_, and
+ // therefore needs to be copied there so we can proceed with updating
+ // their state.
+ //
+
+ // if readChunks_ is not the same as writeChunks_.front(), make a copy
+ // of it there
+
+ if(readChunks_.empty() || (readChunks_.back() != writeChunks_.front())) {
+ const Chunk &curChunk = writeChunks_.front();
+ readChunks_.push_back(curChunk);
+
+ // Any data that existed in the write chunk previously doesn't
+ // belong to this buffer (otherwise it would have already been
+ // added to the readChunk_ list). Here, adjust the start of the
+ // readChunk to begin after any data already existing in curChunk
+
+ readChunks_.back().truncateFront( curChunk.dataSize());
+ }
+
+ assert(readChunks_.back().freeSize() == writeChunks_.front().freeSize());
+
+ // update the states of both readChunks_ and writeChunks_ to indicate that they are
+ // holding the new data
+
+ readChunks_.back().incrementCursor(size);
+ writeChunks_.front().incrementCursor(size);
+ size_ += size;
+ freeSpace_ -= size;
+
+ // if there is no more free space in writeChunks_, the next write cannot use
+ // it, so dispose of it now
+
+ if(writeChunks_.front().freeSize() == 0) {
+ writeChunks_.pop_front();
+ }
+ }
+
+ public:
+
+ typedef std::deque<Chunk> ChunkList;
+ typedef boost::shared_ptr<BufferImpl> SharedPtr;
+ typedef boost::shared_ptr<const BufferImpl> ConstSharedPtr;
+
+ /// Default constructor, creates a buffer without any chunks
+ BufferImpl() :
+ freeSpace_(0),
+ size_(0)
+ { }
+
+ /// Copy constructor, gets a copy of all the chunks with data.
+ explicit BufferImpl(const BufferImpl &src) :
+ readChunks_(src.readChunks_),
+ freeSpace_(0),
+ size_(src.size_)
+ { }
+
+ /// Amount of data held in this buffer.
+ size_type size() const {
+ return size_;
+ }
+
+ /// Capacity that may be written before the buffer must allocate more memory.
+ size_type freeSpace() const {
+ return freeSpace_;
+ }
+
+ /// Add enough free chunks to make the reservation size available.
+ /// Actual amount may be more (rounded up to next chunk).
+ void reserveFreeSpace(size_type reserveSize) {
+ while(freeSpace_ < reserveSize) {
+ allocChunk(reserveSize - freeSpace_);
+ }
+ }
+
+ /// Return the chunk avro's begin iterator for reading.
+ ChunkList::const_iterator beginRead() const {
+ return readChunks_.begin();
+ }
+
+ /// Return the chunk avro's end iterator for reading.
+ ChunkList::const_iterator endRead() const {
+ return readChunks_.end();
+ }
+
+ /// Return the chunk avro's begin iterator for writing.
+ ChunkList::const_iterator beginWrite() const {
+ return writeChunks_.begin();
+ }
+
+ /// Return the chunk avro's end iterator for writing.
+ ChunkList::const_iterator endWrite() const {
+ return writeChunks_.end();
+ }
+
+ /// Write a single value to buffer, add a new chunk if necessary.
+ template<typename T>
+ void writeTo(T val, const boost::true_type&)
+ {
+ if(freeSpace_ && (sizeof(T) <= writeChunks_.front().freeSize())) {
+ // fast path, there's enough room in the writeable chunk to just
+ // straight out copy it
+ *(reinterpret_cast <T*> ( writeChunks_.front().tellWritePos()) ) = val;
+ postWrite(sizeof(T));
+ }
+ else {
+ // need to fixup chunks first, so use the regular memcpy
+ // writeTo method
+ writeTo(reinterpret_cast<data_type*>(&val), sizeof(T));
+ }
+ }
+
+ /// An uninstantiable function, this is if boost::is_fundamental check fails,
+ /// and will compile-time assert.
+ template<typename T>
+ void writeTo(T val, const boost::false_type&)
+ {
+ BOOST_STATIC_ASSERT(sizeof(T)==0);
+ }
+
+ /// Write a block of data to the buffer, adding new chunks if necessary.
+ size_type writeTo(const data_type *data, size_type size)
+ {
+ size_type bytesLeft = size;
+ while(bytesLeft) {
+
+ if(freeSpace_ == 0) {
+ allocChunkChecked();
+ }
+
+ Chunk &chunk = writeChunks_.front();
+ size_type toCopy = std::min<size_type>(chunk.freeSize(), bytesLeft);
+ assert(toCopy);
+ memcpy(chunk.tellWritePos(), data, toCopy);
+ postWrite(toCopy);
+ data += toCopy;
+ bytesLeft -= toCopy;
+ }
+ return size;
+ }
+
+ /// Update internal status of chunks after data is written using iterator.
+ size_type wroteTo(size_type size)
+ {
+ assert(size <= freeSpace_);
+ size_type bytesLeft = size;
+ while (bytesLeft) {
+
+ Chunk &chunk = writeChunks_.front();
+ size_type wrote = std::min<size_type>(chunk.freeSize(), bytesLeft);
+ assert(wrote);
+ postWrite(wrote);
+ bytesLeft -= wrote;
+ }
+ return size;
+ }
+
+ /// Append the chunks that have data in src to this buffer
+ void append(const BufferImpl &src) {
+ std::copy(src.readChunks_.begin(), src.readChunks_.end(), std::back_inserter(readChunks_));
+ size_ += src.size_;
+ }
+
+ /// Remove all the chunks that contain data from this buffer.
+ void discardData() {
+ readChunks_.clear();
+ size_ = 0;
+ }
+
+ /// Remove the specified amount of data from the chunks, starting at the front.
+ void discardData(size_type bytes)
+ {
+ assert(bytes && bytes <= size_);
+
+ size_type bytesToDiscard = bytes;
+ while( bytesToDiscard ) {
+
+ size_t currentSize = readChunks_.front().dataSize();
+
+ // see if entire chunk is discarded
+ if(currentSize <= bytesToDiscard) {
+ readChunks_.pop_front();
+ bytesToDiscard -= currentSize;
+ }
+ else {
+ readChunks_.front().truncateFront(bytesToDiscard);
+ bytesToDiscard = 0;
+ }
+ }
+
+ size_ -= bytes;
+ }
+
+ /// Remove the specified amount of data from the chunks, moving the
+ /// data to dest's chunks
+ void extractData(BufferImpl &dest, size_type bytes)
+ {
+ assert(bytes && bytes <= size_);
+
+ size_type bytesToExtract = bytes;
+ while( bytesToExtract ) {
+
+ size_t currentSize = readChunks_.front().dataSize();
+ dest.readChunks_.push_back(readChunks_.front());
+
+ // see if entire chunk was extracted
+ if(currentSize <= bytesToExtract) {
+ readChunks_.pop_front();
+ bytesToExtract -= currentSize;
+ }
+ else {
+ readChunks_.front().truncateFront(bytesToExtract);
+ size_t excess = currentSize - bytesToExtract;
+ dest.readChunks_.back().truncateBack(excess);
+ bytesToExtract = 0;
+ }
+ }
+
+ size_ -= bytes;
+ dest.size_ += bytes;
+ }
+
+ /// Move data from this to the destination, leaving this buffer without data
+ void extractData(BufferImpl &dest)
+ {
+ assert(dest.readChunks_.empty());
+ dest.readChunks_.swap(readChunks_);
+ dest.size_ = size_;
+ size_ = 0;
+ }
+
+ /// Copy data to a different buffer by copying the chunks. It's
+ /// a bit like extract, but without modifying the source buffer.
+ void copyData(BufferImpl &dest,
+ ChunkList::const_iterator iter,
+ size_type offset,
+ size_type bytes) const
+ {
+ // now we are positioned to start the copying, copy as many
+ // chunks as we need, the first chunk may have a non-zero offset
+ // if the data to copy is not at the start of the chunk
+ size_type copied = 0;
+ while(copied < bytes) {
+
+ dest.readChunks_.push_back(*iter);
+
+ // offset only applies in the first chunk,
+ // all subsequent chunks are copied from the start
+ dest.readChunks_.back().truncateFront(offset);
+ offset = 0;
+
+ copied += dest.readChunks_.back().dataSize();
+ ++iter;
+ }
+
+ // if the last chunk copied has more bytes than we need, truncate it
+ size_type excess = copied - bytes;
+ dest.readChunks_.back().truncateBack(excess);
+
+ dest.size_ += bytes;
+ }
+
+ /// The number of chunks containing data. Used for debugging.
+ int numDataChunks() const {
+ return readChunks_.size();
+ }
+
+ /// The number of chunks containing free space (note that an entire chunk
+ /// may not be free). Used for debugging.
+ int numFreeChunks() const {
+ return writeChunks_.size();
+ }
+
+ /// Add unmanaged data to the buffer. The buffer will not automatically
+ /// free the data, but it will call the supplied function when the data is
+ /// no longer referenced by the buffer (or copies of the buffer).
+ void appendForeignData(const data_type *data, size_type size, const free_func &func) {
+ readChunks_.push_back(Chunk(data, size, func));
+ size_ += size;
+ }
+
+ private:
+
+ /// Assignment not allowed
+ BufferImpl& operator=(const BufferImpl &src);
+ /* {
+ readChunks_.assign(src.readChunks_.begin(), src.readChunks_.end());
+ size_ = src.size();
+ return *this;
+ } */
+
+ ChunkList readChunks_; ///< chunks of this buffer containing data
+ ChunkList writeChunks_; ///< chunks of this buffer containing free space
+
+ size_type freeSpace_; ///< capacity of buffer before allocation required
+ size_type size_; ///< amount of data in buffer
+
+};
+
+} // detail namespace
+
+} // namespace
+
+#endif
Added: hadoop/avro/trunk/lang/c++/api/buffer/detail/BufferDetailIterator.hh
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/api/buffer/detail/BufferDetailIterator.hh?rev=931999&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c++/api/buffer/detail/BufferDetailIterator.hh (added)
+++ hadoop/avro/trunk/lang/c++/api/buffer/detail/BufferDetailIterator.hh Thu Apr 8 16:48:19 2010
@@ -0,0 +1,208 @@
+#ifndef avro_BufferDetailIterator_hh__
+#define avro_BufferDetailIterator_hh__
+
+#include "BufferDetail.hh"
+
+/**
+ * \file BufferDetailIterator.hh
+ *
+ * \brief The implementation details for the Buffer iterators.
+ **/
+
+namespace avro {
+
+namespace detail {
+
+/**
+ * \brief Implements conversion from a chunk to asio::const_buffer
+ *
+ * Iterators for an InputBuffer will iterate over the avro of chunks, so
+ * internally they contain an iterator. But the iterator needs to be
+ * convertable to an asio buffer for use in boost::asio functions. This class
+ * wraps the iterator with a cast operator to do this conversion.
+ **/
+
+struct InputIteratorHelper
+{
+ /// Construct a helper with an unnassigned iterator.
+ InputIteratorHelper() :
+ iter_()
+ {}
+
+ /// Construct a helper with an iterator.
+ InputIteratorHelper(const BufferImpl::ChunkList::const_iterator &iter) :
+ iter_(iter)
+ {}
+
+ /// The location of valid data in this chunk.
+ const data_type *data() const {
+ return iter_->tellReadPos();
+ }
+
+ /// The size of valid data in this chunk.
+ size_type size() const {
+ return iter_->dataSize();
+ }
+
+ /// Conversion operator. It doesn't check for null, because the only
+ /// the only time the chunk should be null is when it's the iterator
+ /// end(), which should never be dereferenced anyway.
+ operator ConstAsioBuffer() const {
+ return ConstAsioBuffer(data(), size());
+ }
+
+ BufferImpl::ChunkList::const_iterator iter_; ///< the current iterator
+};
+
+/**
+ * \brief Implements conversion from a chunk to asio::buffer
+ *
+ * Iterators for an OutputBuffer will iterate over the avro of chunks, so
+ * internally they contain an iterator. But the iterator needs to be
+ * convertable to an asio buffer for use in boost::asio functions. This class
+ * wraps the iterator with a cast operator to do this conversion.
+ */
+
+struct OutputIteratorHelper
+{
+ /// Construct a helper with an unnassigned iterator.
+ OutputIteratorHelper() :
+ iter_()
+ {}
+
+ /// Construct a helper with an iterator.
+ OutputIteratorHelper(const BufferImpl::ChunkList::const_iterator &iter) :
+ iter_(iter)
+ {}
+
+ /// The location of the first writable byte in this chunk.
+ data_type *data() const {
+ return iter_->tellWritePos();
+ }
+
+ /// The size of area that can be written in this chunk.
+ size_type size() const {
+ return iter_->freeSize();
+ }
+
+ /// Conversion operator. It doesn't check for null, because the only
+ /// the only time the chunk should be null is when it's the iterator
+ /// end(), which should never be dereferenced anyway.
+ operator MutableAsioBuffer() const {
+ return MutableAsioBuffer(data(), size());
+ }
+
+ BufferImpl::ChunkList::const_iterator iter_; ///< the current iterator
+};
+
+/**
+ * \brief Implements the iterator for Buffer, that iterates through the
+ * buffer's chunks.
+ **/
+
+template<typename Helper>
+class BufferIterator
+{
+
+ public:
+
+ typedef BufferIterator<Helper> this_type;
+
+ /**
+ * @name Typedefs
+ *
+ * STL iterators define the following declarations. According to
+ * boost::asio documentation, the library expects the iterator to be
+ * bidirectional, however this implements only the forward iterator type.
+ * So far this has not created any problems with asio, but may change if
+ * future versions of the asio require it.
+ **/
+
+ //@{
+ typedef std::forward_iterator_tag iterator_category; // this is a lie to appease asio
+ typedef Helper value_type;
+ typedef std::ptrdiff_t difference_type;
+ typedef value_type* pointer;
+ typedef value_type& reference;
+ //@}
+
+ /// Construct an unitialized iterator.
+ BufferIterator() :
+ helper_()
+ { }
+
+ /* The default implementations are good here
+ /// Copy constructor.
+ BufferIterator(const BufferIterator &src) :
+ helper_(src.helper_)
+ { }
+ /// Assignment.
+ this_type& operator= (const this_type &rhs) {
+ helper_ = rhs.helper_;
+ return *this;
+ }
+ */
+
+ /// Construct iterator at the position in the buffer's chunk list.
+ explicit BufferIterator(BufferImpl::ChunkList::const_iterator iter) :
+ helper_(iter)
+ { }
+
+ /// Dereference iterator, returns InputIteratorHelper or OutputIteratorHelper wrapper.
+ reference operator *() {
+ return helper_;
+ }
+
+ /// Dereference iterator, returns const InputIteratorHelper or OutputIteratorHelper wrapper.
+ const value_type &operator *() const {
+ return helper_;
+ }
+
+ /// Dereference iterator, returns InputIteratorHelper or OutputIteratorHelper wrapper.
+ pointer operator->() {
+ return &helper_;
+ }
+
+ /// Dereference iterator, returns const InputIteratorHelper or OutputIteratorHelper wrapper.
+ const value_type *operator->() const {
+ return &helper_;
+ }
+
+ /// Increment to next chunk in list, or to end() iterator.
+ this_type& operator++()
+ {
+ ++helper_.iter_;
+ return *this;
+ }
+
+ /// Increment to next chunk in list, or to end() iterator.
+ this_type operator++(int)
+ {
+ this_type ret = *this;
+ ++helper_.iter_;
+ return ret;
+ }
+
+ /// True if iterators point to same chunks.
+ bool operator==(const this_type &rhs) const {
+ return (helper_.iter_ == rhs.helper_.iter_);
+ }
+
+ /// True if iterators point to different chunks.
+ bool operator!=(const this_type &rhs) const {
+ return (helper_.iter_ != rhs.helper_.iter_);
+ }
+
+ private:
+
+ Helper helper_;
+};
+
+typedef BufferIterator<InputIteratorHelper> InputBufferIterator;
+typedef BufferIterator<OutputIteratorHelper> OutputBufferIterator;
+
+} // detail namespace
+
+} // namespace
+
+#endif
Modified: hadoop/avro/trunk/lang/c++/configure.in
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/configure.in?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/configure.in (original)
+++ hadoop/avro/trunk/lang/c++/configure.in Thu Apr 8 16:48:19 2010
@@ -18,8 +18,10 @@ AC_CHECK_PROG(DOXYGEN, doxygen, doxygen)
AC_CHECK_PROG(PYTHON, python, python)
# Checks for libraries.
-AX_BOOST_BASE([1.32.0])
+AX_BOOST_BASE([1.35.0])
AX_BOOST_REGEX
+AX_BOOST_THREAD
+AX_BOOST_SYSTEM
# Checks for header files.
AC_FUNC_ALLOCA
Modified: hadoop/avro/trunk/lang/c++/impl/Compiler.cc
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/impl/Compiler.cc?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/impl/Compiler.cc (original)
+++ hadoop/avro/trunk/lang/c++/impl/Compiler.cc Thu Apr 8 16:48:19 2010
@@ -17,7 +17,6 @@
*/
#include "Compiler.hh"
-#include "InputStreamer.hh"
#include "Types.hh"
#include "Schema.hh"
#include "ValidSchema.hh"
Modified: hadoop/avro/trunk/lang/c++/impl/ValidatingReader.cc
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/impl/ValidatingReader.cc?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/impl/ValidatingReader.cc (original)
+++ hadoop/avro/trunk/lang/c++/impl/ValidatingReader.cc Thu Apr 8 16:48:19 2010
@@ -20,11 +20,10 @@
#include "ValidatingReader.hh"
#include "ValidSchema.hh"
-#include "OutputStreamer.hh"
namespace avro {
-ValidatingReader::ValidatingReader(const ValidSchema &schema, InputStreamer &in) :
+ValidatingReader::ValidatingReader(const ValidSchema &schema, const InputBuffer &in) :
validator_(schema),
reader_(in)
{ }
Modified: hadoop/avro/trunk/lang/c++/impl/ValidatingWriter.cc
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/impl/ValidatingWriter.cc?rev=931999&r1=931998&r2=931999&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/c++/impl/ValidatingWriter.cc (original)
+++ hadoop/avro/trunk/lang/c++/impl/ValidatingWriter.cc Thu Apr 8 16:48:19 2010
@@ -20,14 +20,13 @@
#include "ValidatingWriter.hh"
#include "ValidSchema.hh"
-#include "OutputStreamer.hh"
#include "AvroTraits.hh"
namespace avro {
-ValidatingWriter::ValidatingWriter(const ValidSchema &schema, OutputStreamer &out) :
+ValidatingWriter::ValidatingWriter(const ValidSchema &schema) :
validator_(schema),
- writer_(out)
+ writer_()
{ }
void
Added: hadoop/avro/trunk/lang/c++/m4/m4_ax_boost_system.m4
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/c%2B%2B/m4/m4_ax_boost_system.m4?rev=931999&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/c++/m4/m4_ax_boost_system.m4 (added)
+++ hadoop/avro/trunk/lang/c++/m4/m4_ax_boost_system.m4 Thu Apr 8 16:48:19 2010
@@ -0,0 +1,117 @@
+# ===========================================================================
+# http://www.gnu.org/software/autoconf-archive/ax_boost_system.html
+# ===========================================================================
+#
+# SYNOPSIS
+#
+# AX_BOOST_SYSTEM
+#
+# DESCRIPTION
+#
+# Test for System library from the Boost C++ libraries. The macro requires
+# a preceding call to AX_BOOST_BASE. Further documentation is available at
+# <http://randspringer.de/boost/index.html>.
+#
+# This macro calls:
+#
+# AC_SUBST(BOOST_SYSTEM_LIB)
+#
+# And sets:
+#
+# HAVE_BOOST_SYSTEM
+#
+# LICENSE
+#
+# Copyright (c) 2008 Thomas Porschberg <th...@randspringer.de>
+# Copyright (c) 2008 Michael Tindal
+# Copyright (c) 2008 Daniel Casimiro <da...@gmail.com>
+#
+# Copying and distribution of this file, with or without modification, are
+# permitted in any medium without royalty provided the copyright notice
+# and this notice are preserved. This file is offered as-is, without any
+# warranty.
+
+#serial 7
+
+AC_DEFUN([AX_BOOST_SYSTEM],
+[
+ AC_ARG_WITH([boost-system],
+ AS_HELP_STRING([--with-boost-system@<:@=special-lib@:>@],
+ [use the System library from boost - it is possible to specify a certain library for the linker
+ e.g. --with-boost-system=boost_system-gcc-mt ]),
+ [
+ if test "$withval" = "no"; then
+ want_boost="no"
+ elif test "$withval" = "yes"; then
+ want_boost="yes"
+ ax_boost_user_system_lib=""
+ else
+ want_boost="yes"
+ ax_boost_user_system_lib="$withval"
+ fi
+ ],
+ [want_boost="yes"]
+ )
+
+ if test "x$want_boost" = "xyes"; then
+ AC_REQUIRE([AC_PROG_CC])
+ AC_REQUIRE([AC_CANONICAL_BUILD])
+ CPPFLAGS_SAVED="$CPPFLAGS"
+ CPPFLAGS="$CPPFLAGS $BOOST_CPPFLAGS"
+ export CPPFLAGS
+
+ LDFLAGS_SAVED="$LDFLAGS"
+ LDFLAGS="$LDFLAGS $BOOST_LDFLAGS"
+ export LDFLAGS
+
+ AC_CACHE_CHECK(whether the Boost::System library is available,
+ ax_cv_boost_system,
+ [AC_LANG_PUSH([C++])
+ CXXFLAGS_SAVE=$CXXFLAGS
+
+ AC_COMPILE_IFELSE(AC_LANG_PROGRAM([[@%:@include <boost/system/error_code.hpp>]],
+ [[boost::system::system_category]]),
+ ax_cv_boost_system=yes, ax_cv_boost_system=no)
+ CXXFLAGS=$CXXFLAGS_SAVE
+ AC_LANG_POP([C++])
+ ])
+ if test "x$ax_cv_boost_system" = "xyes"; then
+ AC_SUBST(BOOST_CPPFLAGS)
+
+ AC_DEFINE(HAVE_BOOST_SYSTEM,,[define if the Boost::System library is available])
+ BOOSTLIBDIR=`echo $BOOST_LDFLAGS | sed -e 's/@<:@^\/@:>@*//'`
+
+ LDFLAGS_SAVE=$LDFLAGS
+ if test "x$ax_boost_user_system_lib" = "x"; then
+ for libextension in `ls $BOOSTLIBDIR/libboost_system*.{so,a}* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^lib\(boost_system.*\)\.so.*$;\1;' -e 's;^lib\(boost_system.*\)\.a*$;\1;'` ; do
+ ax_lib=${libextension}
+ AC_CHECK_LIB($ax_lib, exit,
+ [BOOST_SYSTEM_LIB="-l$ax_lib"; AC_SUBST(BOOST_SYSTEM_LIB) link_system="yes"; break],
+ [link_system="no"])
+ done
+ if test "x$link_system" != "xyes"; then
+ for libextension in `ls $BOOSTLIBDIR/boost_system*.{dll,a}* 2>/dev/null | sed 's,.*/,,' | sed -e 's;^\(boost_system.*\)\.dll.*$;\1;' -e 's;^\(boost_system.*\)\.a*$;\1;'` ; do
+ ax_lib=${libextension}
+ AC_CHECK_LIB($ax_lib, exit,
+ [BOOST_SYSTEM_LIB="-l$ax_lib"; AC_SUBST(BOOST_SYSTEM_LIB) link_system="yes"; break],
+ [link_system="no"])
+ done
+ fi
+
+ else
+ for ax_lib in $ax_boost_user_system_lib boost_system-$ax_boost_user_system_lib; do
+ AC_CHECK_LIB($ax_lib, exit,
+ [BOOST_SYSTEM_LIB="-l$ax_lib"; AC_SUBST(BOOST_SYSTEM_LIB) link_system="yes"; break],
+ [link_system="no"])
+ done
+
+ fi
+ if test "x$link_system" = "xno"; then
+ AC_MSG_ERROR(Could not link against $ax_lib !)
+ fi
+ fi
+
+ CPPFLAGS="$CPPFLAGS_SAVED"
+ LDFLAGS="$LDFLAGS_SAVED"
+ fi
+])
Propchange: hadoop/avro/trunk/lang/c++/m4/m4_ax_boost_system.m4
------------------------------------------------------------------------------
svn:eol-style = native