You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2015/07/06 23:30:48 UTC

orc git commit: ORC-18. Replaced Buffer with DataBuffer and converted InputStream::read() method to posix style. (asandryh via omalley)

Repository: orc
Updated Branches:
  refs/heads/master 388fb8e90 -> 486433f56


ORC-18. Replaced Buffer with DataBuffer<char> and converted
InputStream::read() method to posix style. (asandryh via omalley)


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/486433f5
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/486433f5
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/486433f5

Branch: refs/heads/master
Commit: 486433f56fa3a63326b2313e0d19396e1be41b6c
Parents: 388fb8e
Author: Aliaksei Sandryhaila <al...@hp.com>
Authored: Mon Jul 6 10:15:29 2015 -0700
Committer: Owen O'Malley <om...@apache.org>
Committed: Mon Jul 6 14:27:52 2015 -0700

----------------------------------------------------------------------
 c++/include/orc/OrcFile.hh      |  34 ++------
 c++/src/orc/Compression.cc      |  41 ++++------
 c++/src/orc/Compression.hh      |  11 +--
 c++/src/orc/OrcFile.cc          | 151 ++---------------------------------
 c++/src/orc/Reader.cc           |  37 +++++----
 c++/test/orc/TestCompression.cc |   8 +-
 tools-c++/test/TestReader.cc    |   2 +-
 7 files changed, 59 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/include/orc/OrcFile.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index 0a4f4ca..a537151 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -31,24 +31,6 @@
 namespace orc {
 
   /**
-   * An abstract interface for a buffer provided by the input stream.
-   */
-  class Buffer {
-  public:
-    virtual ~Buffer();
-
-    /**
-     * Get the start of the buffer.
-     */
-    virtual char *getStart() const = 0;
-
-    /**
-     * Get the length of the buffer in bytes.
-     */
-    virtual uint64_t getLength() const = 0;
-  };
-
-  /**
    * An abstract interface for providing ORC readers a stream of bytes.
    */
   class InputStream {
@@ -62,16 +44,14 @@ namespace orc {
 
     /**
      * Read length bytes from the file starting at offset into
-     * the buffer.
-     * @param offset the position in the file to read from
-     * @param length the number of bytes to read
-     * @param buffer a Buffer to reuse from a previous call to read. Ownership
-     *    of this buffer passes to the InputStream object.
-     * @return the buffer with the requested data. The client owns the Buffer.
+     * the buffer starting at buf.
+     * @param buf the starting position of a buffer.
+     * @param length the number of bytes to read.
+     * @param offset the position in the stream to read from.
      */
-    virtual Buffer* read(uint64_t offset,
-                         uint64_t length,
-                         Buffer* buffer) = 0;
+    virtual void read(void* buf,
+                      uint64_t length,
+                      uint64_t offset) = 0;
 
     /**
      * Get the name of the stream for error messages.

http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/src/orc/Compression.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/Compression.cc b/c++/src/orc/Compression.cc
index bc1c11a..06e10e0 100644
--- a/c++/src/orc/Compression.cc
+++ b/c++/src/orc/Compression.cc
@@ -67,19 +67,6 @@ namespace orc {
     // PASS
   }
 
-  #ifdef ORC_CXX_HAS_INITIALIZER_LIST
-    SeekableArrayInputStream::SeekableArrayInputStream
-       (std::initializer_list<unsigned char> values,
-        int64_t blkSize
-        ):ownedData(new DataBuffer<char>(*getDefaultPool(), values.size())),
-          data(0) {
-      length = values.size();
-      memcpy(ownedData->data(), values.begin(), values.size());
-      position = 0;
-      blockSize = blkSize == -1 ? length : static_cast<uint64_t>(blkSize);
-    }
-  #endif
-
   SeekableArrayInputStream::SeekableArrayInputStream
                (const unsigned char* values,
                 uint64_t size,
@@ -102,7 +89,7 @@ namespace orc {
   bool SeekableArrayInputStream::Next(const void** buffer, int*size) {
     uint64_t currentSize = std::min(length - position, blockSize);
     if (currentSize > 0) {
-      *buffer = (data ? data : ownedData->data()) + position;
+      *buffer = data + position;
       *size = static_cast<int>(currentSize);
       position += currentSize;
       return true;
@@ -158,32 +145,36 @@ namespace orc {
   SeekableFileInputStream::SeekableFileInputStream(InputStream* stream,
                                                    uint64_t offset,
                                                    uint64_t byteCount,
+                                                   MemoryPool& _pool,
                                                    int64_t _blockSize
-                                                   ): input(stream),
-                                                      start(offset),
-                                                      length(byteCount),
-                                                      blockSize(computeBlock
-                                                                (_blockSize,
-                                                                 length)) {
+                                                   ):pool(_pool),
+                                                     input(stream),
+                                                     start(offset),
+                                                     length(byteCount),
+                                                     blockSize(computeBlock
+                                                               (_blockSize,
+                                                                length)) {
+
     position = 0;
-    buffer = nullptr;
+    buffer.reset(new DataBuffer<char>(pool));
     pushBack = 0;
   }
 
   SeekableFileInputStream::~SeekableFileInputStream() {
-    delete buffer;
+    // PASS
   }
 
   bool SeekableFileInputStream::Next(const void** data, int*size) {
     uint64_t bytesRead;
     if (pushBack != 0) {
-      *data = buffer->getStart() + (buffer->getLength() - pushBack);
+      *data = buffer->data() + (buffer->size() - pushBack);
       bytesRead = pushBack;
     } else {
       bytesRead = std::min(length - position, blockSize);
+      buffer->resize(bytesRead);
       if (bytesRead > 0) {
-        buffer = input->read(start + position, bytesRead, buffer);
-        *data = static_cast<void*>(buffer->getStart());
+        input->read(buffer->data(), bytesRead, start+position);
+        *data = static_cast<void*>(buffer->data());
       }
     }
     position += bytesRead;

http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/src/orc/Compression.hh
----------------------------------------------------------------------
diff --git a/c++/src/orc/Compression.hh b/c++/src/orc/Compression.hh
index ac9c094..222dc54 100644
--- a/c++/src/orc/Compression.hh
+++ b/c++/src/orc/Compression.hh
@@ -61,19 +61,12 @@ namespace orc {
    */
   class SeekableArrayInputStream: public SeekableInputStream {
   private:
-    std::unique_ptr<DataBuffer<char> > ownedData;
     const char* data;
     uint64_t length;
     uint64_t position;
     uint64_t blockSize;
 
   public:
-
-    #ifdef ORC_CXX_HAS_INITIALIZER_LIST
-      SeekableArrayInputStream(std::initializer_list<unsigned char> list,
-                               int64_t block_size = -1);
-    #endif
-
     SeekableArrayInputStream(const unsigned char* list,
                              uint64_t length,
                              int64_t block_size = -1);
@@ -94,11 +87,12 @@ namespace orc {
    */
   class SeekableFileInputStream: public SeekableInputStream {
   private:
+    MemoryPool& pool;
     InputStream* const input;
     const uint64_t start;
     const uint64_t length;
     const uint64_t blockSize;
-    Buffer* buffer;
+    std::unique_ptr<DataBuffer<char> > buffer;
     uint64_t position;
     uint64_t pushBack;
 
@@ -106,6 +100,7 @@ namespace orc {
     SeekableFileInputStream(InputStream* input,
                             uint64_t offset,
                             uint64_t byteCount,
+                            MemoryPool& pool,
                             int64_t blockSize = -1);
     virtual ~SeekableFileInputStream();
 

http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/src/orc/OrcFile.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/OrcFile.cc b/c++/src/orc/OrcFile.cc
index 9e850de..9ae9c56 100644
--- a/c++/src/orc/OrcFile.cc
+++ b/c++/src/orc/OrcFile.cc
@@ -30,36 +30,6 @@
 
 namespace orc {
 
-  Buffer::~Buffer() {
-    // PASS
-  }
-
-  class HeapBuffer: public Buffer {
-  private:
-    char* start;
-    uint64_t length;
-
-  public:
-    HeapBuffer(uint64_t size) {
-      start = new char[size];
-      length = size;
-    }
-
-    virtual ~HeapBuffer();
-
-    virtual char *getStart() const override {
-      return start;
-    }
-
-    virtual uint64_t getLength() const override {
-      return length;
-    }
-  };
-
-  HeapBuffer::~HeapBuffer() {
-    delete[] start;
-  }
-
   class FileInputStream : public InputStream {
   private:
     std::string filename ;
@@ -86,24 +56,20 @@ namespace orc {
       return totalLength;
     }
 
-    Buffer* read(uint64_t offset,
-                 uint64_t length,
-                 Buffer* buffer) override {
-      if (buffer == nullptr) {
-        buffer = new HeapBuffer(length);
-      } else if (buffer->getLength() < length) {
-        delete buffer;
-        buffer = new HeapBuffer(length);
+    void read(void* buf,
+              uint64_t length,
+              uint64_t offset) override {
+      if (!buf) {
+        throw ParseError("Buffer is null");
       }
-      ssize_t bytesRead = pread(file, buffer->getStart(), length,
-                                static_cast<off_t>(offset));
+      ssize_t bytesRead = pread(file, buf, length, static_cast<off_t>(offset));
+
       if (bytesRead == -1) {
         throw ParseError("Bad read of " + filename);
       }
       if (static_cast<uint64_t>(bytesRead) != length) {
         throw ParseError("Short read of " + filename);
       }
-      return buffer;
     }
 
     const std::string& getName() const override {
@@ -115,109 +81,6 @@ namespace orc {
     close(file);
   }
 
-  /**
-   * A buffer for use with an memmapped file where the Buffer doesn't own
-   * the memory that it references.
-   */
-  class MmapBuffer: public Buffer {
-  private:
-    char* start;
-    uint64_t length;
-
-  public:
-    MmapBuffer(): start(nullptr), length(0) {
-      // PASS
-    }
-
-    virtual ~MmapBuffer();
-
-    void reset(char *_start, uint64_t _length) {
-      start = _start;
-      length = _length;
-    }
-
-    virtual char *getStart() const override {
-      return start;
-    }
-
-    virtual uint64_t getLength() const override {
-      return length;
-    }
-  };
-
-  MmapBuffer::~MmapBuffer() {
-    // PASS
-  }
-
-  /**
-   * An InputStream implementation that uses memory mapping to read the
-   * local file.
-   */
-  class MmapInputStream : public InputStream {
-  private:
-    std::string filename ;
-    char* start;
-    uint64_t totalLength;
-
-  public:
-    MmapInputStream(std::string _filename);
-    ~MmapInputStream();
-
-    uint64_t getLength() const override {
-      return totalLength;
-    }
-
-    const std::string& getName() const override {
-      return filename;
-    }
-
-    Buffer* read(uint64_t offset,
-                 uint64_t length,
-                 Buffer* buffer) override;
-  };
-
-  MmapInputStream::MmapInputStream(std::string _filename) {
-    filename = _filename ;
-    int file = open(filename.c_str(), O_RDONLY);
-    if (file == -1) {
-      throw ParseError("Can't open " + filename);
-    }
-    struct stat fileStat;
-    if (fstat(file, &fileStat) == -1) {
-      throw ParseError("Can't stat " + filename);
-    }
-    totalLength = static_cast<uint64_t>(fileStat.st_size);
-    start = static_cast<char*>(mmap(nullptr, totalLength, PROT_READ,
-                                    MAP_FILE|MAP_PRIVATE,
-                                    file, 0LL));
-    if (start == MAP_FAILED) {
-      throw std::runtime_error("mmap failed " + filename + " " +
-                               strerror(errno));
-    }
-    close(file);
-  }
-
-  MmapInputStream::~MmapInputStream() {
-    int64_t result = munmap(reinterpret_cast<void*>(start), totalLength);
-    if (result != 0) {
-      throw std::runtime_error("Failed to unmap " + filename + " - " +
-                               strerror(errno));
-    }
-  }
-
-  Buffer* MmapInputStream::read(uint64_t offset,
-                                uint64_t length,
-                                Buffer* buffer) {
-    if (buffer == nullptr) {
-      buffer = new MmapBuffer();
-    }
-    if (offset + length > totalLength) {
-      throw std::runtime_error("Read past end of file " + filename);
-    }
-    dynamic_cast<MmapBuffer*>(buffer)->reset(start + offset, length);
-    return buffer;
-  }
-
   std::unique_ptr<InputStream> readLocalFile(const std::string& path) {
     return std::unique_ptr<InputStream>(new FileInputStream(path));
   }

http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/src/orc/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/orc/Reader.cc b/c++/src/orc/Reader.cc
index 37dd3e0..2343e91 100644
--- a/c++/src/orc/Reader.cc
+++ b/c++/src/orc/Reader.cc
@@ -1232,7 +1232,8 @@ namespace orc {
                            std::unique_ptr<SeekableInputStream>
                              (new SeekableFileInputStream(stream.get(),
                                                           metadataStart,
-                                                          metadataSize)),
+                                                          metadataSize,
+                                                          memoryPool)),
                            blockSize,
                            memoryPool);
       metadata.reset(new proto::Metadata());
@@ -1314,6 +1315,7 @@ namespace orc {
                          (new SeekableFileInputStream(stream.get(),
                                                       stripeFooterStart,
                                                       stripeFooterLength,
+                                                      memoryPool,
                                                       static_cast<int64_t>
                                                       (blockSize)
                                                       )),
@@ -1406,6 +1408,7 @@ namespace orc {
                                    (&input,
                                     offset,
                                     stream.length(),
+                                    memoryPool,
                                     myBlock)),
                                   reader.getCompressionSize(),
                                   memoryPool);
@@ -1553,13 +1556,13 @@ namespace orc {
   }
 
   void ensureOrcFooter(InputStream* stream,
-                       Buffer *buffer,
+                       DataBuffer<char> *buffer,
                        uint64_t postscriptLength) {
 
     const std::string MAGIC("ORC");
     const uint64_t magicLength = MAGIC.length();
-    const char * const bufferStart = buffer->getStart();
-    const uint64_t bufferLength = buffer->getLength();
+    const char * const bufferStart = buffer->data();
+    const uint64_t bufferLength = buffer->size();
 
     if (postscriptLength < magicLength || bufferLength < magicLength) {
       throw ParseError("Invalid ORC postscript length");
@@ -1570,10 +1573,10 @@ namespace orc {
     if (memcmp(magicStart, MAGIC.c_str(), magicLength) != 0) {
       // If there is no magic string at the end, check the beginning.
       // Only files written by Hive 0.11.0 don't have the tail ORC string.
-      Buffer *frontBuffer = stream->read(0, magicLength, nullptr);
-      bool foundMatch =
-        memcmp(frontBuffer->getStart(), MAGIC.c_str(), magicLength) == 0;
-      delete frontBuffer;
+      char *frontBuffer = new char[magicLength];
+      stream->read(frontBuffer, magicLength, 0);
+      bool foundMatch = memcmp(frontBuffer, MAGIC.c_str(), magicLength) == 0;
+      delete[] frontBuffer;
       if (!foundMatch) {
         throw ParseError("Not an ORC file");
       }
@@ -1587,10 +1590,10 @@ namespace orc {
    * @param postscriptSize the length of postscript in bytes
    */
   std::unique_ptr<proto::PostScript> readPostscript(InputStream *stream,
-                                                    Buffer *buffer,
+                                                    DataBuffer<char> *buffer,
                                                     uint64_t postscriptSize) {
-    char *ptr = buffer->getStart();
-    uint64_t readSize = buffer->getLength();
+    char *ptr = buffer->data();
+    uint64_t readSize = buffer->size();
 
     ensureOrcFooter(stream, buffer, postscriptSize);
 
@@ -1613,11 +1616,11 @@ namespace orc {
    * @param memoryPool the memory pool to use
    */
   std::unique_ptr<proto::Footer> readFooter(InputStream* stream,
-                                            Buffer *&buffer,
+                                            DataBuffer<char> *&buffer,
                                             uint64_t footerOffset,
                                             const proto::PostScript& ps,
                                             MemoryPool& memoryPool) {
-    char *footerPtr = buffer->getStart() + footerOffset;
+    char *footerPtr = buffer->data() + footerOffset;
 
     std::unique_ptr<SeekableInputStream> pbStream =
       createDecompressor(convertCompressionKind(ps),
@@ -1662,9 +1665,10 @@ namespace orc {
       if (readSize < 4) {
         throw ParseError("File size too small");
       }
-      Buffer *buffer = stream->read(size - readSize, readSize, nullptr);
+      DataBuffer<char> *buffer = new DataBuffer<char>(*memoryPool, readSize);
+      stream->read(buffer->data(), readSize, size - readSize);
 
-      uint64_t postscriptSize = buffer->getStart()[readSize - 1] & 0xff;
+      uint64_t postscriptSize = buffer->data()[readSize - 1] & 0xff;
       ps = readPostscript(stream.get(), buffer, postscriptSize);
       uint64_t footerSize = ps->footerlength();
       uint64_t tailSize = 1 + postscriptSize + footerSize;
@@ -1672,7 +1676,8 @@ namespace orc {
       uint64_t footerOffset;
 
       if (tailSize > readSize) {
-        buffer = stream->read(size - tailSize, footerSize, buffer);
+        buffer->resize(footerSize);
+        stream->read(buffer->data(), footerSize, size - tailSize);
         footerOffset = 0;
       } else {
         footerOffset = readSize - tailSize;

http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/c++/test/orc/TestCompression.cc
----------------------------------------------------------------------
diff --git a/c++/test/orc/TestCompression.cc b/c++/test/orc/TestCompression.cc
index 465f741..4b4f13f 100644
--- a/c++/test/orc/TestCompression.cc
+++ b/c++/test/orc/TestCompression.cc
@@ -204,7 +204,7 @@ namespace orc {
   TEST_F(TestCompression, testFileBackup) {
     SCOPED_TRACE("testFileBackup");
     std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
-    SeekableFileInputStream stream(file.get(), 0, 200, 20);
+    SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
     const void *ptr;
     int len;
     ASSERT_THROW(stream.BackUp(10), std::logic_error);
@@ -235,7 +235,7 @@ namespace orc {
   TEST_F(TestCompression, testFileSkip) {
     SCOPED_TRACE("testFileSkip");
     std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
-    SeekableFileInputStream stream(file.get(), 0, 200, 20);
+    SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
     const void *ptr;
     int len;
     ASSERT_EQ(true, stream.Next(&ptr, &len));
@@ -255,7 +255,7 @@ namespace orc {
   TEST_F(TestCompression, testFileCombo) {
     SCOPED_TRACE("testFileCombo");
     std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
-    SeekableFileInputStream stream(file.get(), 0, 200, 20);
+    SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
     const void *ptr;
     int len;
     ASSERT_EQ(true, stream.Next(&ptr, &len));
@@ -275,7 +275,7 @@ namespace orc {
   TEST_F(TestCompression, testFileSeek) {
     SCOPED_TRACE("testFileSeek");
     std::unique_ptr<InputStream> file = readLocalFile(simpleFile);
-    SeekableFileInputStream stream(file.get(), 0, 200, 20);
+    SeekableFileInputStream stream(file.get(), 0, 200, *getDefaultPool(), 20);
     const void *ptr;
     int len;
     EXPECT_EQ(0, stream.ByteCount());

http://git-wip-us.apache.org/repos/asf/orc/blob/486433f5/tools-c++/test/TestReader.cc
----------------------------------------------------------------------
diff --git a/tools-c++/test/TestReader.cc b/tools-c++/test/TestReader.cc
index aae0f1d..4f82d8a 100644
--- a/tools-c++/test/TestReader.cc
+++ b/tools-c++/test/TestReader.cc
@@ -2913,7 +2913,7 @@ public:
   ~MockInputStream();
   MOCK_CONST_METHOD0(getLength, uint64_t());
   MOCK_CONST_METHOD0(getName, const std::string&());
-  MOCK_METHOD3(read, Buffer* (uint64_t, uint64_t, Buffer*));
+  MOCK_METHOD3(read, void (void*, uint64_t, uint64_t));
 };
 
 MockInputStream::~MockInputStream() {