You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2017/05/16 20:02:56 UTC
orc git commit: ORC-177 OutputStream interface and implementation
Repository: orc
Updated Branches:
refs/heads/master 2a0ae5370 -> f856ba368
ORC-177 OutputStream interface and implementation
1. Create output stream interface and implementation classes
2. Add memory input output stream for the convenince of UT
3. Add corresponding unit test
Change-Id: I2495345453c3070fa43290edb2f7a012e117069b
Fixes #119
Signed-off-by: Owen O'Malley <om...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/f856ba36
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/f856ba36
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/f856ba36
Branch: refs/heads/master
Commit: f856ba368484bc0e5e0045966cf303ba537452d7
Parents: 2a0ae53
Author: Xiening.Dai <xi...@alibaba-inc.com>
Authored: Wed May 10 22:47:28 2017 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Tue May 16 12:56:59 2017 -0700
----------------------------------------------------------------------
c++/include/orc/OrcFile.hh | 41 +++++++++
c++/src/CMakeLists.txt | 1 +
c++/src/OrcFile.cc | 72 +++++++++++++++
c++/src/io/OutputStream.cc | 143 ++++++++++++++++++++++++++++++
c++/src/io/OutputStream.hh | 96 ++++++++++++++++++++
c++/test/CMakeLists.txt | 3 +
c++/test/MemoryInputStream.cc | 24 +++++
c++/test/MemoryInputStream.hh | 66 ++++++++++++++
c++/test/MemoryOutputStream.cc | 31 +++++++
c++/test/MemoryOutputStream.hh | 70 +++++++++++++++
c++/test/TestBufferedOutputStream.cc | 128 ++++++++++++++++++++++++++
11 files changed, 675 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/include/orc/OrcFile.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index f8f13fb..38e17e6 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -66,6 +66,42 @@ namespace orc {
};
/**
+ * An abstract interface for providing ORC writer a stream of bytes.
+ */
+ class OutputStream {
+ public:
+ virtual ~OutputStream();
+
+ /**
+ * Get the total length of bytes written.
+ */
+ virtual uint64_t getLength() const = 0;
+
+ /**
+ * Get the natural size for reads.
+ * @return the number of bytes that should be written at once
+ */
+ virtual uint64_t getNaturalWriteSize() const = 0;
+
+ /**
+ * Write/Append length bytes pointed by buf to the file stream
+ * @param buf the starting position of a buffer.
+ * @param length the number of bytes to write.
+ */
+ virtual void write(const void* buf, size_t length) = 0;
+
+ /**
+ * Get the name of the stream for error messages.
+ */
+ virtual const std::string& getName() const = 0;
+
+ /**
+ * Close the stream and flush any pending data to the disk.
+ */
+ virtual void close() = 0;
+ };
+
+ /**
* Create a stream to a local file.
* @param path the name of the file in the local file system
*/
@@ -78,6 +114,11 @@ namespace orc {
*/
ORC_UNIQUE_PTR<Reader> createReader(ORC_UNIQUE_PTR<InputStream> stream,
const ReaderOptions& options);
+ /**
+ * Create a stream to write to a local file.
+ * @param path the name of the file in the local file system
+ */
+ ORC_UNIQUE_PTR<OutputStream> writeLocalFile(const std::string& path);
}
#endif
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index c39437a..1f9f086 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -136,6 +136,7 @@ add_library (orc STATIC
"${CMAKE_CURRENT_BINARY_DIR}/Adaptor.hh"
orc_proto.pb.h
io/InputStream.cc
+ io/OutputStream.cc
wrap/orc-proto-wrapper.cc
ByteRLE.cc
ColumnPrinter.cc
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/src/OrcFile.cc
----------------------------------------------------------------------
diff --git a/c++/src/OrcFile.cc b/c++/src/OrcFile.cc
index d5d00fd..5d9bc80 100644
--- a/c++/src/OrcFile.cc
+++ b/c++/src/OrcFile.cc
@@ -89,6 +89,78 @@ namespace orc {
std::unique_ptr<InputStream> readLocalFile(const std::string& path) {
return std::unique_ptr<InputStream>(new FileInputStream(path));
}
+
+ OutputStream::~OutputStream() {
+ // PASS
+ };
+
+ class FileOutputStream : public OutputStream {
+ private:
+ std::string filename;
+ int file;
+ uint64_t bytesWritten;
+ bool closed;
+
+ public:
+ FileOutputStream(std::string _filename) {
+ bytesWritten = 0;
+ filename = _filename;
+ closed = false;
+ file = open(
+ filename.c_str(),
+ O_CREAT | O_WRONLY | O_TRUNC,
+ S_IRUSR | S_IWUSR);
+ if (file == -1) {
+ throw ParseError("Can't open " + filename);
+ }
+ }
+
+ ~FileOutputStream();
+
+ uint64_t getLength() const override {
+ return bytesWritten;
+ }
+
+ uint64_t getNaturalWriteSize() const override {
+ return 128 * 1024;
+ }
+
+ void write(const void* buf, size_t length) override {
+ if (closed) {
+ throw std::logic_error("Cannot write to closed stream.");
+ }
+ ssize_t bytesWrite = ::write(file, buf, length);
+ if (bytesWrite == -1) {
+ throw ParseError("Bad write of " + filename);
+ }
+ if (static_cast<uint64_t>(bytesWrite) != length) {
+ throw ParseError("Short write of " + filename);
+ }
+ bytesWritten += static_cast<uint64_t>(bytesWrite);
+ }
+
+ const std::string& getName() const override {
+ return filename;
+ }
+
+ void close() override {
+ if (!closed) {
+ ::close(file);
+ closed = true;
+ }
+ }
+ };
+
+ FileOutputStream::~FileOutputStream() {
+ if (!closed) {
+ ::close(file);
+ closed = true;
+ }
+ }
+
+ std::unique_ptr<OutputStream> writeLocalFile(const std::string& path) {
+ return std::unique_ptr<OutputStream>(new FileOutputStream(path));
+ }
}
#ifndef HAS_STOLL
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/src/io/OutputStream.cc
----------------------------------------------------------------------
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
new file mode 100644
index 0000000..fa2de30
--- /dev/null
+++ b/c++/src/io/OutputStream.cc
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Exceptions.hh"
+#include "OutputStream.hh"
+
+#include <sstream>
+
+namespace orc {
+
+ BufferedOutputStream::BufferedOutputStream(
+ MemoryPool& pool,
+ OutputStream * outStream,
+ uint64_t capacity_,
+ uint64_t blockSize_)
+ : outputStream(outStream),
+ blockSize(blockSize_) {
+ dataBuffer.reset(new DataBuffer<char>(pool));
+ dataBuffer->reserve(capacity_);
+ }
+
+ BufferedOutputStream::~BufferedOutputStream() {
+ // PASS
+ }
+
+ bool BufferedOutputStream::Next(void** buffer, int* size) {
+ *size = static_cast<int>(blockSize);
+ uint64_t oldSize = dataBuffer->size();
+ uint64_t newSize = oldSize + blockSize;
+ uint64_t newCapacity = dataBuffer->capacity();
+ while (newCapacity < newSize) {
+ newCapacity += dataBuffer->capacity();
+ }
+ dataBuffer->reserve(newCapacity);
+ dataBuffer->resize(dataBuffer->size() + blockSize);
+ *buffer = dataBuffer->data() + oldSize;
+ return true;
+ }
+
+ void BufferedOutputStream::BackUp(int count) {
+ if (count >= 0) {
+ uint64_t unsignedCount = static_cast<uint64_t>(count);
+ if (unsignedCount <= dataBuffer->size()) {
+ dataBuffer->resize(dataBuffer->size() - unsignedCount);
+ } else {
+ throw std::logic_error("Can't backup that much!");
+ }
+ }
+ }
+
+ google::protobuf::int64 BufferedOutputStream::ByteCount() const {
+ return static_cast<google::protobuf::int64>(dataBuffer->size());
+ }
+
+ bool BufferedOutputStream::WriteAliasedRaw(const void *, int) {
+ throw NotImplementedYet("WriteAliasedRaw is not supported.");
+ }
+
+ bool BufferedOutputStream::AllowsAliasing() const {
+ return false;
+ }
+
+ std::string BufferedOutputStream::getName() const {
+ std::ostringstream result;
+ result << "BufferedOutputStream " << dataBuffer->size() << " of "
+ << dataBuffer->capacity();
+ return result.str();
+ }
+
+ uint64_t BufferedOutputStream::getSize() const {
+ return dataBuffer->size();
+ }
+
+ uint64_t BufferedOutputStream::flush() {
+ uint64_t dataSize = dataBuffer->size();
+ outputStream->write(dataBuffer->data(), dataBuffer->size());
+ dataBuffer->resize(0);
+ return dataSize;
+ }
+
+ void AppendOnlyBufferedStream::write(const char * data, size_t size) {
+ size_t dataOffset = 0;
+ while (size > 0) {
+ if (bufferOffset == bufferLength) {
+ if (!outStream->Next(
+ reinterpret_cast<void **>(&buffer),
+ &bufferLength)) {
+ throw std::logic_error("Failed to allocate buffer.");
+ }
+ bufferOffset = 0;
+ }
+ size_t len = std::min(
+ static_cast<size_t>(bufferLength - bufferOffset),
+ size);
+ memcpy(buffer + bufferOffset, data + dataOffset, len);
+ bufferOffset += static_cast<int>(len);
+ dataOffset += len;
+ size -= len;
+ }
+ }
+
+ uint64_t AppendOnlyBufferedStream::getSize() const {
+ return outStream->getSize();
+ }
+
+ uint64_t AppendOnlyBufferedStream::flush() {
+ outStream->BackUp(bufferLength - bufferOffset);
+ bufferOffset = bufferLength = 0;
+ buffer = nullptr;
+ return outStream->flush();
+ }
+
+ void AppendOnlyBufferedStream::recordPosition(PositionRecorder* recorder) const {
+ uint64_t flushedSize = outStream->getSize();
+ uint64_t unflushedSize = static_cast<uint64_t>(bufferOffset);
+ if (outStream->isCompressed()) {
+ // start of the compression chunk in the stream
+ recorder->add(flushedSize);
+ // number of decompressed bytes that need to be consumed
+ recorder->add(unflushedSize);
+ } else {
+ flushedSize -= static_cast<uint64_t>(bufferLength);
+ // byte offset of the start location
+ recorder->add(flushedSize + unflushedSize);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/src/io/OutputStream.hh
----------------------------------------------------------------------
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
new file mode 100644
index 0000000..8ff2061
--- /dev/null
+++ b/c++/src/io/OutputStream.hh
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_OUTPUTSTREAM_HH
+#define ORC_OUTPUTSTREAM_HH
+
+#include "Adaptor.hh"
+#include "orc/OrcFile.hh"
+#include "wrap/zero-copy-stream-wrapper.h"
+
+namespace orc {
+
+ /**
+ * Record write position for creating index stream
+ */
+ class PositionRecorder {
+ public:
+ virtual ~PositionRecorder();
+ virtual void add(uint64_t pos) = 0;
+ };
+
+ /**
+ * A subclass of Google's ZeroCopyOutputStream that supports output to memory
+ * buffer, and flushing to OutputStream.
+ * By extending Google's class, we get the ability to pass it directly
+ * to the protobuf writers.
+ */
+ class BufferedOutputStream: public google::protobuf::io::ZeroCopyOutputStream {
+ private:
+ OutputStream * outputStream;
+ std::unique_ptr<DataBuffer<char> > dataBuffer;
+ uint64_t blockSize;
+
+ public:
+ BufferedOutputStream(MemoryPool& pool,
+ OutputStream * outStream,
+ uint64_t capacity,
+ uint64_t block_size);
+ virtual ~BufferedOutputStream();
+
+ virtual bool Next(void** data, int*size) override;
+ virtual void BackUp(int count) override;
+ virtual google::protobuf::int64 ByteCount() const override;
+ virtual bool WriteAliasedRaw(const void * data, int size) override;
+ virtual bool AllowsAliasing() const override;
+
+ virtual std::string getName() const;
+ virtual uint64_t getSize() const;
+ virtual uint64_t flush();
+
+ virtual bool isCompressed() const { return false; }
+ };
+
+ /**
+ * An append only buffered stream that allows
+ * buffer, and flushing to OutputStream.
+ * By extending Google's class, we get the ability to pass it directly
+ * to the protobuf writers.
+ */
+ class AppendOnlyBufferedStream {
+ private:
+ std::unique_ptr<BufferedOutputStream> outStream;
+ char * buffer;
+ int bufferOffset, bufferLength;
+
+ public:
+ AppendOnlyBufferedStream(std::unique_ptr<BufferedOutputStream> _outStream) :
+ outStream(std::move(_outStream)) {
+ buffer = nullptr;
+ bufferOffset = bufferLength = 0;
+ }
+
+ void write(const char * data, size_t size);
+ uint64_t getSize() const;
+ uint64_t flush();
+
+ void recordPosition(PositionRecorder* recorder) const;
+ };
+}
+
+#endif // ORC_OUTPUTSTREAM_HH
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/test/CMakeLists.txt b/c++/test/CMakeLists.txt
index 29f5f6c..ad5893c 100644
--- a/c++/test/CMakeLists.txt
+++ b/c++/test/CMakeLists.txt
@@ -23,6 +23,9 @@ include_directories(
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX11_FLAGS} ${WARN_FLAGS}")
add_executable (orc-test
+ MemoryInputStream.cc
+ MemoryOutputStream.cc
+ TestBufferedOutputStream.cc
TestByteRle.cc
TestColumnPrinter.cc
TestColumnReader.cc
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/MemoryInputStream.cc
----------------------------------------------------------------------
diff --git a/c++/test/MemoryInputStream.cc b/c++/test/MemoryInputStream.cc
new file mode 100644
index 0000000..851c5ee
--- /dev/null
+++ b/c++/test/MemoryInputStream.cc
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MemoryInputStream.hh"
+
+namespace orc {
+ MemoryInputStream::~MemoryInputStream() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/MemoryInputStream.hh
----------------------------------------------------------------------
diff --git a/c++/test/MemoryInputStream.hh b/c++/test/MemoryInputStream.hh
new file mode 100644
index 0000000..302a551
--- /dev/null
+++ b/c++/test/MemoryInputStream.hh
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_MEMORYINPUTSTREAM_HH
+#define ORC_MEMORYINPUTSTREAM_HH
+
+#include "orc/OrcFile.hh"
+#include "io/InputStream.hh"
+
+#include <iostream>
+
+namespace orc {
+ class MemoryInputStream : public InputStream {
+ public:
+ MemoryInputStream(const char * _buffer, size_t _size) :
+ buffer(_buffer),
+ size(_size),
+ naturalReadSize(1024),
+ name("MemoryInputStream") {
+ }
+
+ ~MemoryInputStream();
+
+ virtual uint64_t getLength() const override {
+ return size;
+ }
+
+ virtual uint64_t getNaturalReadSize() const override {
+ return naturalReadSize;
+ }
+
+ virtual void read(void* buf, uint64_t length, uint64_t offset) override {
+ memcpy(buf, buffer + offset, length);
+ }
+
+ virtual const std::string& getName() const override {
+ return name;
+ }
+
+ const char * getData() const {
+ return buffer;
+ }
+
+ private:
+ const char * buffer;
+ uint64_t size, naturalReadSize;
+ std::string name;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/MemoryOutputStream.cc
----------------------------------------------------------------------
diff --git a/c++/test/MemoryOutputStream.cc b/c++/test/MemoryOutputStream.cc
new file mode 100644
index 0000000..972b5af
--- /dev/null
+++ b/c++/test/MemoryOutputStream.cc
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MemoryOutputStream.hh"
+
+namespace orc {
+
+ MemoryOutputStream::~MemoryOutputStream() {
+ // PASS
+ }
+
+ void MemoryOutputStream::write(const void* buf, size_t size) {
+ memcpy(data + length, buf, size);
+ length += size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/MemoryOutputStream.hh
----------------------------------------------------------------------
diff --git a/c++/test/MemoryOutputStream.hh b/c++/test/MemoryOutputStream.hh
new file mode 100644
index 0000000..99751e5
--- /dev/null
+++ b/c++/test/MemoryOutputStream.hh
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_MEMORYOUTPUTSTREAM_HH
+#define ORC_MEMORYOUTPUTSTREAM_HH
+
+#include "orc/OrcFile.hh"
+#include "io/OutputStream.hh"
+
+#include <iostream>
+
+namespace orc {
+
+ class MemoryOutputStream : public OutputStream {
+ public:
+ MemoryOutputStream(ssize_t capacity) : name("MemoryOutputStream") {
+ data = new char[capacity];
+ length = 0;
+ }
+
+ virtual ~MemoryOutputStream();
+
+ virtual uint64_t getLength() const override {
+ return length;
+ }
+
+ virtual uint64_t getNaturalWriteSize() const override {
+ return naturalWriteSize;
+ }
+
+ virtual void write(const void* buf, size_t size) override;
+
+ virtual const std::string& getName() const override {
+ return name;
+ }
+
+ const char * getData() const {
+ return data;
+ }
+
+ void close() override {
+ }
+
+ void reset() {
+ length = 0;
+ }
+
+ private:
+ char * data;
+ std::string name;
+ uint64_t length, naturalWriteSize;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/orc/blob/f856ba36/c++/test/TestBufferedOutputStream.cc
----------------------------------------------------------------------
diff --git a/c++/test/TestBufferedOutputStream.cc b/c++/test/TestBufferedOutputStream.cc
new file mode 100644
index 0000000..d1786a6
--- /dev/null
+++ b/c++/test/TestBufferedOutputStream.cc
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "wrap/orc-proto-wrapper.hh"
+#include "wrap/gtest-wrapper.h"
+
+#include "MemoryOutputStream.hh"
+
+namespace orc {
+ TEST(BufferedOutputStream, block_aligned) {
+ MemoryOutputStream memStream(1024);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 1000;
+ uint64_t block = 10;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+ for (int i = 0; i < 100; ++i) {
+ char * buf;
+ int len;
+ EXPECT_TRUE(bufStream.Next(reinterpret_cast<void **>(&buf), &len));
+ EXPECT_EQ(10, len);
+ for (int j = 0; j < 10; ++j) {
+ buf[j] = static_cast<char>('a' + j);
+ }
+ }
+
+ bufStream.flush();
+ EXPECT_EQ(1000, memStream.getLength());
+ for (int i = 0; i < 1000; ++i) {
+ EXPECT_EQ(memStream.getData()[i], 'a' + i % 10);
+ }
+ }
+
+ TEST(BufferedOutputStream, block_not_aligned) {
+ MemoryOutputStream memStream(1024);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 20;
+ uint64_t block = 10;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ char * buf;
+ int len;
+ EXPECT_TRUE(bufStream.Next(reinterpret_cast<void **>(&buf), &len));
+ EXPECT_EQ(10, len);
+
+ for (int i = 0; i < 7; ++i) {
+ buf[i] = static_cast<char>('a' + i);
+ }
+
+ bufStream.BackUp(3);
+ bufStream.flush();
+
+ EXPECT_EQ(7, memStream.getLength());
+ for (int i = 0; i < 7; ++i) {
+ EXPECT_EQ(memStream.getData()[i], 'a' + i);
+ }
+
+ EXPECT_TRUE(bufStream.Next(reinterpret_cast<void **>(&buf), &len));
+ EXPECT_EQ(10, len);
+
+ for (int i = 0; i < 5; ++i) {
+ buf[i] = static_cast<char>('a' + i);
+ }
+
+ bufStream.BackUp(5);
+ bufStream.flush();
+
+ EXPECT_EQ(12, memStream.getLength());
+ for (int i = 0; i < 7; ++i) {
+ EXPECT_EQ(memStream.getData()[i], 'a' + i);
+ }
+
+ for (int i = 0; i < 5; ++i) {
+ EXPECT_EQ(memStream.getData()[i + 7], 'a' + i);
+ }
+ }
+
+ TEST(BufferedOutputStream, protobuff_serialization) {
+ MemoryOutputStream memStream(1024);
+ MemoryPool * pool = getDefaultPool();
+
+ uint64_t capacity = 20;
+ uint64_t block = 10;
+ BufferedOutputStream bufStream(*pool, &memStream, capacity, block);
+
+ proto::PostScript ps;
+ ps.set_footerlength(197934);
+ ps.set_compression(proto::ZLIB);
+ ps.add_version(6);
+ ps.add_version(20);
+ ps.set_metadatalength(100);
+ ps.set_writerversion(789);
+ ps.set_magic("protobuff_serialization");
+
+ EXPECT_TRUE(ps.SerializeToZeroCopyStream(&bufStream));
+ bufStream.flush();
+ EXPECT_EQ(ps.ByteSize(), memStream.getLength());
+
+ proto::PostScript ps2;
+ ps2.ParseFromArray(
+ memStream.getData(),
+ static_cast<int>(memStream.getLength()));
+
+ EXPECT_EQ(ps.footerlength(), ps2.footerlength());
+ EXPECT_EQ(ps.compression(), ps2.compression());
+ EXPECT_EQ(ps.version(0), ps2.version(0));
+ EXPECT_EQ(ps.version(1), ps2.version(1));
+ EXPECT_EQ(ps.metadatalength(), ps2.metadatalength());
+ EXPECT_EQ(ps.writerversion(), ps2.writerversion());
+ EXPECT_EQ(ps.magic(), ps2.magic());
+ }
+}