You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/05/16 20:02:56 UTC

orc git commit: ORC-177 OutputStream interface and implementation

Repository: orc
Updated Branches:
  refs/heads/master 2a0ae5370 -> f856ba368


ORC-177 OutputStream interface and implementation

1. Create output stream interface and implementation classes
2. Add memory input output stream for the convenince of UT
3. Add corresponding unit test

Change-Id: I2495345453c3070fa43290edb2f7a012e117069b

Fixes #119

Signed-off-by: Owen O'Malley <om...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/f856ba36
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/f856ba36
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/f856ba36

Branch: refs/heads/master
Commit: f856ba368484bc0e5e0045966cf303ba537452d7
Parents: 2a0ae53
Author: Xiening.Dai <xi...@alibaba-inc.com>
Authored: Wed May 10 22:47:28 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue May 16 12:56:59 2017 -0700

----------------------------------------------------------------------
 c++/include/orc/OrcFile.hh           |  41 +++++++++
 c++/src/CMakeLists.txt               |   1 +
 c++/src/OrcFile.cc                   |  72 +++++++++++++++
 c++/src/io/OutputStream.cc           | 143 ++++++++++++++++++++++++++++++
 c++/src/io/OutputStream.hh           |  96 ++++++++++++++++++++
 c++/test/CMakeLists.txt              |   3 +
 c++/test/MemoryInputStream.cc        |  24 +++++
 c++/test/MemoryInputStream.hh        |  66 ++++++++++++++
 c++/test/MemoryOutputStream.cc       |  31 +++++++
 c++/test/MemoryOutputStream.hh       |  70 +++++++++++++++
 c++/test/TestBufferedOutputStream.cc | 128 ++++++++++++++++++++++++++
 11 files changed, 675 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/include/orc/OrcFile.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index f8f13fb..38e17e6 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -66,6 +66,42 @@ namespace orc {
   };
 
   /**
+   * An abstract interface for providing ORC writer a stream of bytes.
+   */
+  class OutputStream {
+  public:
+    virtual ~OutputStream();
+
+    /**
+     * Get the total length of bytes written.
+     */
+    virtual uint64_t getLength() const = 0;
+
+    /**
+     * Get the natural size for reads.
+     * @return the number of bytes that should be written at once
+     */
+    virtual uint64_t getNaturalWriteSize() const = 0;
+
+    /**
+     * Write/Append length bytes pointed by buf to the file stream
+     * @param buf the starting position of a buffer.
+     * @param length the number of bytes to write.
+     */
+    virtual void write(const void* buf, size_t length) = 0;
+
+    /**
+     * Get the name of the stream for error messages.
+     */
+    virtual const std::string& getName() const = 0;
+
+    /**
+     * Close the stream and flush any pending data to the disk.
+     */
+    virtual void close() = 0;
+  };
+
+  /**
    * Create a stream to a local file.
    * @param path the name of the file in the local file system
    */
@@ -78,6 +114,11 @@ namespace orc {
    */
   ORC_UNIQUE_PTR<Reader> createReader(ORC_UNIQUE_PTR<InputStream> stream,
                                       const ReaderOptions& options);
+  /**
+   * Create a stream to write to a local file.
+   * @param path the name of the file in the local file system
+   */
+  ORC_UNIQUE_PTR<OutputStream> writeLocalFile(const std::string& path);
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index c39437a..1f9f086 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -136,6 +136,7 @@ add_library (orc STATIC
   "${CMAKE_CURRENT_BINARY_DIR}/Adaptor.hh"
   orc_proto.pb.h
   io/InputStream.cc
+  io/OutputStream.cc
   wrap/orc-proto-wrapper.cc
   ByteRLE.cc
   ColumnPrinter.cc

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/src/OrcFile.cc
----------------------------------------------------------------------
diff --git a/c++/src/OrcFile.cc b/c++/src/OrcFile.cc
index d5d00fd..5d9bc80 100644
--- a/c++/src/OrcFile.cc
+++ b/c++/src/OrcFile.cc
@@ -89,6 +89,78 @@ namespace orc {
   std::unique_ptr<InputStream> readLocalFile(const std::string& path) {
     return std::unique_ptr<InputStream>(new FileInputStream(path));
   }
+
+  OutputStream::~OutputStream() {
+      // PASS
+  };
+
+  class FileOutputStream : public OutputStream {
+  private:
+    std::string filename;
+    int file;
+    uint64_t bytesWritten;
+    bool closed;
+
+  public:
+    FileOutputStream(std::string _filename) {
+      bytesWritten = 0;
+      filename = _filename;
+      closed = false;
+      file = open(
+                  filename.c_str(),
+                  O_CREAT | O_WRONLY | O_TRUNC,
+                  S_IRUSR | S_IWUSR);
+      if (file == -1) {
+        throw ParseError("Can't open " + filename);
+      }
+    }
+
+    ~FileOutputStream();
+
+    uint64_t getLength() const override {
+      return bytesWritten;
+    }
+
+    uint64_t getNaturalWriteSize() const override {
+      return 128 * 1024;
+    }
+
+    void write(const void* buf, size_t length) override {
+      if (closed) {
+        throw std::logic_error("Cannot write to closed stream.");
+      }
+      ssize_t bytesWrite = ::write(file, buf, length);
+      if (bytesWrite == -1) {
+        throw ParseError("Bad write of " + filename);
+      }
+      if (static_cast<uint64_t>(bytesWrite) != length) {
+        throw ParseError("Short write of " + filename);
+      }
+      bytesWritten += static_cast<uint64_t>(bytesWrite);
+    }
+
+    const std::string& getName() const override {
+      return filename;
+    }
+
+    void close() override {
+      if (!closed) {
+        ::close(file);
+        closed = true;
+      }
+    }
+  };
+
+  FileOutputStream::~FileOutputStream() {
+    if (!closed) {
+      ::close(file);
+      closed = true;
+    }
+  }
+
+  std::unique_ptr<OutputStream> writeLocalFile(const std::string& path) {
+    return std::unique_ptr<OutputStream>(new FileOutputStream(path));
+  }
 }
 
 #ifndef HAS_STOLL

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/src/io/OutputStream.cc
----------------------------------------------------------------------
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
new file mode 100644
index 0000000..fa2de30
--- /dev/null
+++ b/c++/src/io/OutputStream.cc
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exceptions.hh"
+#include "OutputStream.hh"
+
+#include <sstream>
+
+namespace orc {
+
+  BufferedOutputStream::BufferedOutputStream(
+                                    MemoryPool& pool,
+                                    OutputStream * outStream,
+                                    uint64_t capacity_,
+                                    uint64_t blockSize_)
+                                    : outputStream(outStream),
+                                      blockSize(blockSize_) {
+    dataBuffer.reset(new DataBuffer<char>(pool));
+    dataBuffer->reserve(capacity_);
+  }
+
+  BufferedOutputStream::~BufferedOutputStream() {
+    // PASS
+  }
+
+  bool BufferedOutputStream::Next(void** buffer, int* size) {
+    *size = static_cast<int>(blockSize);
+    uint64_t oldSize = dataBuffer->size();
+    uint64_t newSize = oldSize + blockSize;
+    uint64_t newCapacity = dataBuffer->capacity();
+    while (newCapacity < newSize) {
+      newCapacity += dataBuffer->capacity();
+    }
+    dataBuffer->reserve(newCapacity);
+    dataBuffer->resize(dataBuffer->size() + blockSize);
+    *buffer = dataBuffer->data() + oldSize;
+    return true;
+  }
+
+  void BufferedOutputStream::BackUp(int count) {
+    if (count >= 0) {
+      uint64_t unsignedCount = static_cast<uint64_t>(count);
+      if (unsignedCount <= dataBuffer->size()) {
+        dataBuffer->resize(dataBuffer->size() - unsignedCount);
+      } else {
+        throw std::logic_error("Can't backup that much!");
+      }
+    }
+  }
+
+  google::protobuf::int64 BufferedOutputStream::ByteCount() const {
+    return static_cast<google::protobuf::int64>(dataBuffer->size());
+  }
+
+  bool BufferedOutputStream::WriteAliasedRaw(const void *, int) {
+    throw NotImplementedYet("WriteAliasedRaw is not supported.");
+  }
+
+  bool BufferedOutputStream::AllowsAliasing() const {
+    return false;
+  }
+
+  std::string BufferedOutputStream::getName() const {
+    std::ostringstream result;
+    result << "BufferedOutputStream " << dataBuffer->size() << " of "
+                                              << dataBuffer->capacity();
+    return result.str();
+  }
+
+  uint64_t BufferedOutputStream::getSize() const {
+    return dataBuffer->size();
+  }
+
+  uint64_t BufferedOutputStream::flush() {
+    uint64_t dataSize = dataBuffer->size();
+    outputStream->write(dataBuffer->data(), dataBuffer->size());
+    dataBuffer->resize(0);
+    return dataSize;
+  }
+
+  void AppendOnlyBufferedStream::write(const char * data, size_t size) {
+    size_t dataOffset = 0;
+    while (size > 0) {
+      if (bufferOffset == bufferLength) {
+        if (!outStream->Next(
+                              reinterpret_cast<void **>(&buffer),
+                              &bufferLength)) {
+          throw std::logic_error("Failed to allocate buffer.");
+        }
+        bufferOffset = 0;
+      }
+      size_t len = std::min(
+                           static_cast<size_t>(bufferLength - bufferOffset),
+                           size);
+      memcpy(buffer + bufferOffset, data + dataOffset, len);
+      bufferOffset += static_cast<int>(len);
+      dataOffset += len;
+      size -= len;
+    }
+  }
+
+  uint64_t AppendOnlyBufferedStream::getSize() const {
+    return outStream->getSize();
+  }
+
+  uint64_t AppendOnlyBufferedStream::flush() {
+    outStream->BackUp(bufferLength - bufferOffset);
+    bufferOffset = bufferLength = 0;
+    buffer = nullptr;
+    return outStream->flush();
+  }
+
+  void AppendOnlyBufferedStream::recordPosition(PositionRecorder* recorder) const {
+    uint64_t flushedSize = outStream->getSize();
+    uint64_t unflushedSize = static_cast<uint64_t>(bufferOffset);
+    if (outStream->isCompressed()) {
+      // start of the compression chunk in the stream
+      recorder->add(flushedSize);
+      // number of decompressed bytes that need to be consumed
+      recorder->add(unflushedSize);
+    } else {
+      flushedSize -= static_cast<uint64_t>(bufferLength);
+      // byte offset of the start location
+      recorder->add(flushedSize + unflushedSize);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/src/io/OutputStream.hh
----------------------------------------------------------------------
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
new file mode 100644
index 0000000..8ff2061
--- /dev/null
+++ b/c++/src/io/OutputStream.hh
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_OUTPUTSTREAM_HH
+#define ORC_OUTPUTSTREAM_HH
+
+#include "Adaptor.hh"
+#include "orc/OrcFile.hh"
+#include "wrap/zero-copy-stream-wrapper.h"
+
+namespace orc {
+
+  /**
+   * Record write position for creating index stream
+  */
+  class PositionRecorder {
+  public:
+    virtual ~PositionRecorder();
+    virtual void add(uint64_t pos) = 0;
+  };
+
+  /**
+   * A subclass of Google's ZeroCopyOutputStream that supports output to memory
+   * buffer, and flushing to OutputStream.
+   * By extending Google's class, we get the ability to pass it directly
+   * to the protobuf writers.
+   */
+  class BufferedOutputStream: public google::protobuf::io::ZeroCopyOutputStream {
+  private:
+    OutputStream * outputStream;
+    std::unique_ptr<DataBuffer<char> > dataBuffer;
+    uint64_t blockSize;
+
+  public:
+    BufferedOutputStream(MemoryPool& pool,
+                      OutputStream * outStream,
+                      uint64_t capacity,
+                      uint64_t block_size);
+    virtual ~BufferedOutputStream();
+
+    virtual bool Next(void** data, int*size) override;
+    virtual void BackUp(int count) override;
+    virtual google::protobuf::int64 ByteCount() const override;
+    virtual bool WriteAliasedRaw(const void * data, int size) override;
+    virtual bool AllowsAliasing() const override;
+
+    virtual std::string getName() const;
+    virtual uint64_t getSize() const;
+    virtual uint64_t flush();
+
+    virtual bool isCompressed() const { return false; }
+  };
+
+  /**
+   * An append only buffered stream that allows
+   * buffer, and flushing to OutputStream.
+   * By extending Google's class, we get the ability to pass it directly
+   * to the protobuf writers.
+   */
+  class AppendOnlyBufferedStream {
+  private:
+    std::unique_ptr<BufferedOutputStream> outStream;
+    char * buffer;
+    int bufferOffset, bufferLength;
+
+  public:
+    AppendOnlyBufferedStream(std::unique_ptr<BufferedOutputStream> _outStream) :
+                                              outStream(std::move(_outStream)) {
+      buffer = nullptr;
+      bufferOffset = bufferLength = 0;
+    }
+
+    void write(const char * data, size_t size);
+    uint64_t getSize() const;
+    uint64_t flush();
+
+    void recordPosition(PositionRecorder* recorder) const;
+  };
+}
+
+#endif // ORC_OUTPUTSTREAM_HH

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index 29f5f6c..ad5893c 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -23,6 +23,9 @@ include_directories(
 set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX11_FLAGS} ${WARN_FLAGS}")
 
 add_executable (orc-test
+  MemoryInputStream.cc
+  MemoryOutputStream.cc
+  TestBufferedOutputStream.cc
   TestByteRle.cc
   TestColumnPrinter.cc
   TestColumnReader.cc

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/MemoryInputStream.cc
----------------------------------------------------------------------
diff --git a/c++/test/MemoryInputStream.cc b/c++/test/MemoryInputStream.cc
new file mode 100644
index 0000000..851c5ee
--- /dev/null
+++ b/c++/test/MemoryInputStream.cc
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MemoryInputStream.hh"
+
+namespace orc {
+  MemoryInputStream::~MemoryInputStream() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/MemoryInputStream.hh
----------------------------------------------------------------------
diff --git a/c++/test/MemoryInputStream.hh b/c++/test/MemoryInputStream.hh
new file mode 100644
index 0000000..302a551
--- /dev/null
+++ b/c++/test/MemoryInputStream.hh
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_MEMORYINPUTSTREAM_HH
+#define ORC_MEMORYINPUTSTREAM_HH
+
+#include "orc/OrcFile.hh"
+#include "io/InputStream.hh"
+
+#include <iostream>
+
+namespace orc {
+  class MemoryInputStream : public InputStream {
+  public:
+    MemoryInputStream(const char * _buffer, size_t _size) :
+                                        buffer(_buffer),
+                                        size(_size),
+                                        naturalReadSize(1024),
+                                        name("MemoryInputStream") {
+    }
+
+    ~MemoryInputStream();
+
+    virtual uint64_t getLength() const override {
+      return size;
+    }
+
+    virtual uint64_t getNaturalReadSize() const override {
+      return naturalReadSize;
+    }
+
+    virtual void read(void* buf, uint64_t length, uint64_t offset) override {
+      memcpy(buf, buffer + offset, length);
+    }
+
+    virtual const std::string& getName() const override {
+      return name;
+    }
+
+    const char * getData() const {
+      return buffer;
+    }
+
+  private:
+    const char * buffer;
+    uint64_t size, naturalReadSize;
+    std::string name;
+  };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/MemoryOutputStream.cc
----------------------------------------------------------------------
diff --git a/c++/test/MemoryOutputStream.cc b/c++/test/MemoryOutputStream.cc
new file mode 100644
index 0000000..972b5af
--- /dev/null
+++ b/c++/test/MemoryOutputStream.cc
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MemoryOutputStream.hh"
+
+namespace orc {
+
+  MemoryOutputStream::~MemoryOutputStream() {
+    // PASS
+  }
+
+  void MemoryOutputStream::write(const void* buf, size_t size) {
+      memcpy(data + length, buf, size);
+      length += size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/MemoryOutputStream.hh
----------------------------------------------------------------------
diff --git a/c++/test/MemoryOutputStream.hh b/c++/test/MemoryOutputStream.hh
new file mode 100644
index 0000000..99751e5
--- /dev/null
+++ b/c++/test/MemoryOutputStream.hh
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_MEMORYOUTPUTSTREAM_HH
+#define ORC_MEMORYOUTPUTSTREAM_HH
+
+#include "orc/OrcFile.hh"
+#include "io/OutputStream.hh"
+
+#include <iostream>
+
+namespace orc {
+
+  class MemoryOutputStream : public OutputStream {
+  public:
+    MemoryOutputStream(ssize_t capacity) : name("MemoryOutputStream") {
+      data = new char[capacity];
+      length = 0;
+    }
+
+    virtual ~MemoryOutputStream();
+
+    virtual uint64_t getLength() const override {
+      return length;
+    }
+
+    virtual uint64_t getNaturalWriteSize() const override {
+      return naturalWriteSize;
+    }
+
+    virtual void write(const void* buf, size_t size) override;
+
+    virtual const std::string& getName() const override {
+      return name;
+    }
+
+    const char * getData() const {
+      return data;
+    }
+
+    void close() override {
+    }
+
+    void reset()  {
+      length = 0;
+    }
+
+  private:
+    char * data;
+    std::string name;
+    uint64_t length, naturalWriteSize;
+  };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/TestBufferedOutputStream.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestBufferedOutputStream.cc b/c++/test/TestBufferedOutputStream.cc
new file mode 100644
index 0000000..d1786a6
--- /dev/null
+++ b/c++/test/TestBufferedOutputStream.cc
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "wrap/orc-proto-wrapper.hh"
+#include "wrap/gtest-wrapper.h"
+
+#include "MemoryOutputStream.hh"
+
+namespace orc {
+  TEST(BufferedOutputStream, block_aligned) {
+    MemoryOutputStream memStream(1024);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 1000;
+    uint64_t block = 10;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+    for (int i = 0; i < 100; ++i) {
+      char * buf;
+      int len;
+      EXPECT_TRUE(bufStream.Next(reinterpret_cast<void **>(&buf), &len));
+      EXPECT_EQ(10, len);
+      for (int j = 0; j < 10; ++j) {
+        buf[j] = static_cast<char>('a' + j);
+      }
+    }
+
+    bufStream.flush();
+    EXPECT_EQ(1000, memStream.getLength());
+    for (int i = 0; i < 1000; ++i) {
+      EXPECT_EQ(memStream.getData()[i], 'a' + i % 10);
+    }
+  }
+
+  TEST(BufferedOutputStream, block_not_aligned) {
+    MemoryOutputStream memStream(1024);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 20;
+    uint64_t block = 10;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    char * buf;
+    int len;
+    EXPECT_TRUE(bufStream.Next(reinterpret_cast<void **>(&buf), &len));
+    EXPECT_EQ(10, len);
+
+    for (int i = 0; i < 7; ++i) {
+        buf[i] = static_cast<char>('a' + i);
+    }
+
+    bufStream.BackUp(3);
+    bufStream.flush();
+
+    EXPECT_EQ(7, memStream.getLength());
+    for (int i = 0; i < 7; ++i) {
+      EXPECT_EQ(memStream.getData()[i], 'a' + i);
+    }
+
+    EXPECT_TRUE(bufStream.Next(reinterpret_cast<void **>(&buf), &len));
+    EXPECT_EQ(10, len);
+
+    for (int i = 0; i < 5; ++i) {
+        buf[i] = static_cast<char>('a' + i);
+    }
+
+    bufStream.BackUp(5);
+    bufStream.flush();
+
+    EXPECT_EQ(12, memStream.getLength());
+    for (int i = 0; i < 7; ++i) {
+      EXPECT_EQ(memStream.getData()[i], 'a' + i);
+    }
+
+    for (int i = 0; i < 5; ++i) {
+     EXPECT_EQ(memStream.getData()[i + 7], 'a' + i);
+    }
+  }
+
+  TEST(BufferedOutputStream, protobuff_serialization) {
+    MemoryOutputStream memStream(1024);
+    MemoryPool * pool = getDefaultPool();
+
+    uint64_t capacity = 20;
+    uint64_t block = 10;
+    BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+    proto::PostScript ps;
+    ps.set_footerlength(197934);
+    ps.set_compression(proto::ZLIB);
+    ps.add_version(6);
+    ps.add_version(20);
+    ps.set_metadatalength(100);
+    ps.set_writerversion(789);
+    ps.set_magic("protobuff_serialization");
+
+    EXPECT_TRUE(ps.SerializeToZeroCopyStream(&bufStream));
+    bufStream.flush();
+    EXPECT_EQ(ps.ByteSize(), memStream.getLength());
+
+    proto::PostScript ps2;
+    ps2.ParseFromArray(
+                       memStream.getData(),
+                       static_cast<int>(memStream.getLength()));
+
+    EXPECT_EQ(ps.footerlength(), ps2.footerlength());
+    EXPECT_EQ(ps.compression(), ps2.compression());
+    EXPECT_EQ(ps.version(0), ps2.version(0));
+    EXPECT_EQ(ps.version(1), ps2.version(1));
+    EXPECT_EQ(ps.metadatalength(), ps2.metadatalength());
+    EXPECT_EQ(ps.writerversion(), ps2.writerversion());
+    EXPECT_EQ(ps.magic(), ps2.magic());
+  }
+}