You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2015/07/06 23:30:48 UTC
orc git commit: ORC-18. Replaced Buffer with DataBuffer and
converted InputStream::read() method to posix style. (asandryh via omalley)
Repository: orc
Updated Branches:
refs/heads/master 388fb8e90 -> 486433f56
ORC-18. Replaced Buffer with DataBuffer<char> and converted
InputStream::read() method to posix style. (asandryh via omalley)
Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/486433f5
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/486433f5
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/486433f5
Branch: refs/heads/master
Commit: 486433f56fa3a63326b2313e0d19396e1be41b6c
Parents: 388fb8e
Author: Aliaksei Sandryhaila <al...@hp.com>
Authored: Mon Jul 6 10:15:29 2015 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Jul 6 14:27:52 2015 -0700
----------------------------------------------------------------------
c++/include/orc/OrcFile.hh | 34 ++------
c++/src/orc/Compression.cc | 41 ++++------
c++/src/orc/Compression.hh | 11 +--
c++/src/orc/OrcFile.cc | 151 ++---------------------------------
c++/src/orc/Reader.cc | 37 +++++----
c++/test/orc/TestCompression.cc | 8 +-
tools-c++/test/TestReader.cc | 2 +-
7 files changed, 59 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/include/orc/OrcFile.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index 0a4f4ca..a537151 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -31,24 +31,6 @@
namespace orc {
/**
- * An abstract interface for a buffer provided by the input stream.
- */
- class Buffer {
- public:
- virtual ~Buffer();
-
- /**
- * Get the start of the buffer.
- */
- virtual char *getStart() const = 0;
-
- /**
- * Get the length of the buffer in bytes.
- */
- virtual uint64_t getLength() const = 0;
- };
-
- /**
* An abstract interface for providing ORC readers a stream of bytes.
*/
class InputStream {
@@ -62,16 +44,14 @@ namespace orc {
/**
* Read length bytes from the file starting at offset into
- * the buffer.
- * @param offset the position in the file to read from
- * @param length the number of bytes to read
- * @param buffer a Buffer to reuse from a previous call to read. Ownership
- * of this buffer passes to the InputStream object.
- * @return the buffer with the requested data. The client owns the Buffer.
+ * the buffer starting at buf.
+ * @param buf the starting position of a buffer.
+ * @param length the number of bytes to read.
+ * @param offset the position in the stream to read from.
*/
- virtual Buffer* read(uint64_t offset,
- uint64_t length,
- Buffer* buffer) = 0;
+ virtual void read(void* buf,
+ uint64_t length,
+ uint64_t offset) = 0;
/**
* Get the name of the stream for error messages.
http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/src/orc/Compression.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/Compression.cc b/c++/src/orc/Compression.cc
index bc1c11a..06e10e0 100644
--- a/c++/src/orc/Compression.cc
+++ b/c++/src/orc/Compression.cc
@@ -67,19 +67,6 @@ namespace orc {
// PASS
}
- #ifdef ORC_CXX_HAS_INITIALIZER_LIST
- SeekableArrayInputStream::SeekableArrayInputStream
- (std::initializer_list<unsigned char> values,
- int64_t blkSize
- ):ownedData(new DataBuffer<char>(*getDefaultPool(), values.size())),
- data(0) {
- length = values.size();
- memcpy(ownedData->data(), values.begin(), values.size());
- position = 0;
- blockSize = blkSize == -1 ? length : static_cast<uint64_t>(blkSize);
- }
- #endif
-
SeekableArrayInputStream::SeekableArrayInputStream
(const unsigned char* values,
uint64_t size,
@@ -102,7 +89,7 @@ namespace orc {
bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
uint64_t currentSize = std::min(length - position, blockSize);
if (currentSize > 0) {
- *buffer = (data ? data : ownedData->data()) + position;
+ *buffer = data + position;
*size = static_cast<int>(currentSize);
position += currentSize;
return true;
@@ -158,32 +145,36 @@ namespace orc {
SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
uint64_t offset,
uint64_t byteCount,
+ MemoryPool& _pool,
int64_t _blockSize
- ): input(stream),
- start(offset),
- length(byteCount),
- blockSize(computeBlock
- (_blockSize,
- length)) {
+ ):pool(_pool),
+ input(stream),
+ start(offset),
+ length(byteCount),
+ blockSize(computeBlock
+ (_blockSize,
+ length)) {
+
position = 0;
- buffer = nullptr;
+ buffer.reset(new DataBuffer<char>(pool));
pushBack = 0;
}
SeekableFileInputStream::~SeekableFileInputStream() {
- delete buffer;
+ // PASS
}
bool SeekableFileInputStream::Next(const void** data, int*size) {
uint64_t bytesRead;
if (pushBack != 0) {
- *data = buffer->getStart() + (buffer->getLength() - pushBack);
+ *data = buffer->data() + (buffer->size() - pushBack);
bytesRead = pushBack;
} else {
bytesRead = std::min(length - position, blockSize);
+ buffer->resize(bytesRead);
if (bytesRead > 0) {
- buffer = input->read(start + position, bytesRead, buffer);
- *data = static_cast<void*>(buffer->getStart());
+ input->read(buffer->data(), bytesRead, start+position);
+ *data = static_cast<void*>(buffer->data());
}
}
position += bytesRead;
http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/src/orc/Compression.hh
----------------------------------------------------------------------
diff --git a/c++/src/orc/Compression.hh b/c++/src/orc/Compression.hh
index ac9c094..222dc54 100644
--- a/c++/src/orc/Compression.hh
+++ b/c++/src/orc/Compression.hh
@@ -61,19 +61,12 @@ namespace orc {
*/
class SeekableArrayInputStream: public SeekableInputStream {
private:
- std::unique_ptr<DataBuffer<char> > ownedData;
const char* data;
uint64_t length;
uint64_t position;
uint64_t blockSize;
public:
-
- #ifdef ORC_CXX_HAS_INITIALIZER_LIST
- SeekableArrayInputStream(std::initializer_list<unsigned char> list,
- int64_t block_size = -1);
- #endif
-
SeekableArrayInputStream(const unsigned char* list,
uint64_t length,
int64_t block_size = -1);
@@ -94,11 +87,12 @@ namespace orc {
*/
class SeekableFileInputStream: public SeekableInputStream {
private:
+ MemoryPool& pool;
InputStream* const input;
const uint64_t start;
const uint64_t length;
const uint64_t blockSize;
- Buffer* buffer;
+ std::unique_ptr<DataBuffer<char> > buffer;
uint64_t position;
uint64_t pushBack;
@@ -106,6 +100,7 @@ namespace orc {
SeekableFileInputStream(InputStream* input,
uint64_t offset,
uint64_t byteCount,
+ MemoryPool& pool,
int64_t blockSize = -1);
virtual ~SeekableFileInputStream();
http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/src/orc/OrcFile.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/OrcFile.cc b/c++/src/orc/OrcFile.cc
index 9e850de..9ae9c56 100644
--- a/c++/src/orc/OrcFile.cc
+++ b/c++/src/orc/OrcFile.cc
@@ -30,36 +30,6 @@
namespace orc {
- Buffer::~Buffer() {
- // PASS
- }
-
- class HeapBuffer: public Buffer {
- private:
- char* start;
- uint64_t length;
-
- public:
- HeapBuffer(uint64_t size) {
- start = new char[size];
- length = size;
- }
-
- virtual ~HeapBuffer();
-
- virtual char *getStart() const override {
- return start;
- }
-
- virtual uint64_t getLength() const override {
- return length;
- }
- };
-
- HeapBuffer::~HeapBuffer() {
- delete[] start;
- }
-
class FileInputStream : public InputStream {
private:
std::string filename ;
@@ -86,24 +56,20 @@ namespace orc {
return totalLength;
}
- Buffer* read(uint64_t offset,
- uint64_t length,
- Buffer* buffer) override {
- if (buffer == nullptr) {
- buffer = new HeapBuffer(length);
- } else if (buffer->getLength() < length) {
- delete buffer;
- buffer = new HeapBuffer(length);
+ void read(void* buf,
+ uint64_t length,
+ uint64_t offset) override {
+ if (!buf) {
+ throw ParseError("Buffer is null");
}
- ssize_t bytesRead = pread(file, buffer->getStart(), length,
- static_cast<off_t>(offset));
+ ssize_t bytesRead = pread(file, buf, length, static_cast<off_t>(offset));
+
if (bytesRead == -1) {
throw ParseError("Bad read of " + filename);
}
if (static_cast<uint64_t>(bytesRead) != length) {
throw ParseError("Short read of " + filename);
}
- return buffer;
}
const std::string& getName() const override {
@@ -115,109 +81,6 @@ namespace orc {
close(file);
}
- /**
- * A buffer for use with an memmapped file where the Buffer doesn't own
- * the memory that it references.
- */
- class MmapBuffer: public Buffer {
- private:
- char* start;
- uint64_t length;
-
- public:
- MmapBuffer(): start(nullptr), length(0) {
- // PASS
- }
-
- virtual ~MmapBuffer();
-
- void reset(char *_start, uint64_t _length) {
- start = _start;
- length = _length;
- }
-
- virtual char *getStart() const override {
- return start;
- }
-
- virtual uint64_t getLength() const override {
- return length;
- }
- };
-
- MmapBuffer::~MmapBuffer() {
- // PASS
- }
-
- /**
- * An InputStream implementation that uses memory mapping to read the
- * local file.
- */
- class MmapInputStream : public InputStream {
- private:
- std::string filename ;
- char* start;
- uint64_t totalLength;
-
- public:
- MmapInputStream(std::string _filename);
- ~MmapInputStream();
-
- uint64_t getLength() const override {
- return totalLength;
- }
-
- const std::string& getName() const override {
- return filename;
- }
-
- Buffer* read(uint64_t offset,
- uint64_t length,
- Buffer* buffer) override;
- };
-
- MmapInputStream::MmapInputStream(std::string _filename) {
- filename = _filename ;
- int file = open(filename.c_str(), O_RDONLY);
- if (file == -1) {
- throw ParseError("Can't open " + filename);
- }
- struct stat fileStat;
- if (fstat(file, &fileStat) == -1) {
- throw ParseError("Can't stat " + filename);
- }
- totalLength = static_cast<uint64_t>(fileStat.st_size);
- start = static_cast<char*>(mmap(nullptr, totalLength, PROT_READ,
- MAP_FILE|MAP_PRIVATE,
- file, 0LL));
- if (start == MAP_FAILED) {
- throw std::runtime_error("mmap failed " + filename + " " +
- strerror(errno));
- }
- close(file);
- }
-
- MmapInputStream::~MmapInputStream() {
- int64_t result = munmap(reinterpret_cast<void*>(start), totalLength);
- if (result != 0) {
- throw std::runtime_error("Failed to unmap " + filename + " - " +
- strerror(errno));
- }
- }
-
- Buffer* MmapInputStream::read(uint64_t offset,
- uint64_t length,
- Buffer* buffer) {
- if (buffer == nullptr) {
- buffer = new MmapBuffer();
- }
- if (offset + length > totalLength) {
- throw std::runtime_error("Read past end of file " + filename);
- }
- dynamic_cast<MmapBuffer*>(buffer)->reset(start + offset, length);
- return buffer;
- }
-
std::unique_ptr<InputStream> readLocalFile(const std::string& path) {
return std::unique_ptr<InputStream>(new FileInputStream(path));
}
http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/src/orc/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/Reader.cc b/c++/src/orc/Reader.cc
index 37dd3e0..2343e91 100644
--- a/c++/src/orc/Reader.cc
+++ b/c++/src/orc/Reader.cc
@@ -1232,7 +1232,8 @@ namespace orc {
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream(stream.get(),
metadataStart,
- metadataSize)),
+ metadataSize,
+ memoryPool)),
blockSize,
memoryPool);
metadata.reset(new proto::Metadata());
@@ -1314,6 +1315,7 @@ namespace orc {
(new SeekableFileInputStream(stream.get(),
stripeFooterStart,
stripeFooterLength,
+ memoryPool,
static_cast<int64_t>
(blockSize)
)),
@@ -1406,6 +1408,7 @@ namespace orc {
(&input,
offset,
stream.length(),
+ memoryPool,
myBlock)),
reader.getCompressionSize(),
memoryPool);
@@ -1553,13 +1556,13 @@ namespace orc {
}
void ensureOrcFooter(InputStream* stream,
- Buffer *buffer,
+ DataBuffer<char> *buffer,
uint64_t postscriptLength) {
const std::string MAGIC("ORC");
const uint64_t magicLength = MAGIC.length();
- const char * const bufferStart = buffer->getStart();
- const uint64_t bufferLength = buffer->getLength();
+ const char * const bufferStart = buffer->data();
+ const uint64_t bufferLength = buffer->size();
if (postscriptLength < magicLength || bufferLength < magicLength) {
throw ParseError("Invalid ORC postscript length");
@@ -1570,10 +1573,10 @@ namespace orc {
if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) {
// If there is no magic string at the end, check the beginning.
// Only files written by Hive 0.11.0 don't have the tail ORC string.
- Buffer *frontBuffer = stream->read(0, magicLength, nullptr);
- bool foundMatch =
- memcmp(frontBuffer->getStart(), MAGIC.c_str(), magicLength) == 0;
- delete frontBuffer;
+ char *frontBuffer = new char[magicLength];
+ stream->read(frontBuffer, magicLength, 0);
+ bool foundMatch = memcmp(frontBuffer, MAGIC.c_str(), magicLength) == 0;
+ delete[] frontBuffer;
if (!foundMatch) {
throw ParseError("Not an ORC file");
}
@@ -1587,10 +1590,10 @@ namespace orc {
* @param postscriptSize the length of postscript in bytes
*/
std::unique_ptr<proto::PostScript> readPostscript(InputStream *stream,
- Buffer *buffer,
+ DataBuffer<char> *buffer,
uint64_t postscriptSize) {
- char *ptr = buffer->getStart();
- uint64_t readSize = buffer->getLength();
+ char *ptr = buffer->data();
+ uint64_t readSize = buffer->size();
ensureOrcFooter(stream, buffer, postscriptSize);
@@ -1613,11 +1616,11 @@ namespace orc {
* @param memoryPool the memory pool to use
*/
std::unique_ptr<proto::Footer> readFooter(InputStream* stream,
- Buffer *&buffer,
+ DataBuffer<char> *&buffer,
uint64_t footerOffset,
const proto::PostScript& ps,
MemoryPool& memoryPool) {
- char *footerPtr = buffer->getStart() + footerOffset;
+ char *footerPtr = buffer->data() + footerOffset;
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(convertCompressionKind(ps),
@@ -1662,9 +1665,10 @@ namespace orc {
if (readSize < 4) {
throw ParseError("File size too small");
}
- Buffer *buffer = stream->read(size - readSize, readSize, nullptr);
+ DataBuffer<char> *buffer = new DataBuffer<char>(*memoryPool, readSize);
+ stream->read(buffer->data(), readSize, size - readSize);
- uint64_t postscriptSize = buffer->getStart()[readSize - 1] & 0xff;
+ uint64_t postscriptSize = buffer->data()[readSize - 1] & 0xff;
ps = readPostscript(stream.get(), buffer, postscriptSize);
uint64_t footerSize = ps->footerlength();
uint64_t tailSize = 1 + postscriptSize + footerSize;
@@ -1672,7 +1676,8 @@ namespace orc {
uint64_t footerOffset;
if (tailSize > readSize) {
- buffer = stream->read(size - tailSize, footerSize, buffer);
+ buffer->resize(footerSize);
+ stream->read(buffer->data(), footerSize, size - tailSize);
footerOffset = 0;
} else {
footerOffset = readSize - tailSize;
http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/test/orc/TestCompression.cc
----------------------------------------------------------------------
diff --git a/c++/test/orc/TestCompression.cc b/c++/test/orc/TestCompression.cc
index 465f741..4b4f13f 100644
--- a/c++/test/orc/TestCompression.cc
+++ b/c++/test/orc/TestCompression.cc
@@ -204,7 +204,7 @@ namespace orc {
TEST_F(TestCompression, testFileBackup) {
SCOPED_TRACE("testFileBackup");
std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
- SeekableFileInputStream stream(file.get(), 0, 200, 20);
+ SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
const void *ptr;
int len;
ASSERT_THROW(stream.BackUp(10), std::logic_error);
@@ -235,7 +235,7 @@ namespace orc {
TEST_F(TestCompression, testFileSkip) {
SCOPED_TRACE("testFileSkip");
std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
- SeekableFileInputStream stream(file.get(), 0, 200, 20);
+ SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
const void *ptr;
int len;
ASSERT_EQ(true, stream.Next(&ptr, &len));
@@ -255,7 +255,7 @@ namespace orc {
TEST_F(TestCompression, testFileCombo) {
SCOPED_TRACE("testFileCombo");
std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
- SeekableFileInputStream stream(file.get(), 0, 200, 20);
+ SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
const void *ptr;
int len;
ASSERT_EQ(true, stream.Next(&ptr, &len));
@@ -275,7 +275,7 @@ namespace orc {
TEST_F(TestCompression, testFileSeek) {
SCOPED_TRACE("testFileSeek");
std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
- SeekableFileInputStream stream(file.get(), 0, 200, 20);
+ SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
const void *ptr;
int len;
EXPECT_EQ(0, stream.ByteCount());
http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/tools-c++/test/TestReader.cc
----------------------------------------------------------------------
diff --git a/tools-c++/test/TestReader.cc b/tools-c++/test/TestReader.cc
index aae0f1d..4f82d8a 100644
--- a/tools-c++/test/TestReader.cc
+++ b/tools-c++/test/TestReader.cc
@@ -2913,7 +2913,7 @@ public:
~MockInputStream();
MOCK_CONST_METHOD0(getLength, uint64_t());
MOCK_CONST_METHOD0(getName, const std::string&());
- MOCK_METHOD3(read, Buffer* (uint64_t, uint64_t, Buffer*));
+ MOCK_METHOD3(read, void (void*, uint64_t, uint64_t));
};
MockInputStream::~MockInputStream() {