You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by ga...@apache.org on 2020/03/31 09:37:24 UTC

[orc] branch master updated: ORC-615: [C++] Refactor decompression streams into common base class

This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/master by this push:
     new 7654764  ORC-615: [C++] Refactor decompression streams into common base class
7654764 is described below

commit 76547648fe36b7d93638dc2712057eb511248094
Author: Norbert Luksa <no...@cloudera.com>
AuthorDate: Tue Mar 31 11:37:14 2020 +0200

    ORC-615: [C++] Refactor decompression streams into common base class
    
    This commit does not intend to change anything in the current
    behaviour. Its only purpose is to reduce the huge amount of code
    duplication in  Compression.cc by refactoring the common functionality
    of  ZlibDecompressionStream and BlockDecompressionStream
    under a common base class.
---
 c++/src/Compression.cc | 637 ++++++++++++++++++++-----------------------------
 1 file changed, 259 insertions(+), 378 deletions(-)

diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 91cf2f7..4f55821 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -311,67 +311,34 @@ DIAGNOSTIC_PUSH
                          DECOMPRESS_ORIGINAL,
                          DECOMPRESS_EOF};
 
-  class ZlibDecompressionStream: public SeekableInputStream {
+  class DecompressionStream : public SeekableInputStream {
   public:
-    ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
-                            size_t blockSize,
-                            MemoryPool& pool);
-    virtual ~ZlibDecompressionStream() override;
+    DecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
+                        size_t bufferSize,
+                        MemoryPool& pool);
+    virtual ~DecompressionStream() override {}
     virtual bool Next(const void** data, int*size) override;
     virtual void BackUp(int count) override;
     virtual bool Skip(int count) override;
     virtual int64_t ByteCount() const override;
     virtual void seek(PositionProvider& position) override;
-    virtual std::string getName() const override;
-
-  private:
-    void readBuffer(bool failOnEof) {
-      int length;
-      if (!input->Next(reinterpret_cast<const void**>(&inputBuffer),
-                       &length)) {
-        if (failOnEof) {
-          throw ParseError("Read past EOF in "
-                           "ZlibDecompressionStream::readBuffer");
-        }
-        state = DECOMPRESS_EOF;
-        inputBuffer = nullptr;
-        inputBufferEnd = nullptr;
-      } else {
-        inputBufferEnd = inputBuffer + length;
-      }
-    }
+    virtual std::string getName() const override = 0;
 
-    uint32_t readByte(bool failOnEof) {
-      if (inputBuffer == inputBufferEnd) {
-        readBuffer(failOnEof);
-        if (state == DECOMPRESS_EOF) {
-          return 0;
-        }
-      }
-      return static_cast<unsigned char>(*(inputBuffer++));
-    }
+  protected:
+    virtual void NextDecompress(const void** data,
+                                int*size,
+                                size_t availableSize) = 0;
 
-    void readHeader() {
-      uint32_t header = readByte(false);
-      if (state != DECOMPRESS_EOF) {
-        header |= readByte(true) << 8;
-        header |= readByte(true) << 16;
-        if (header & 1) {
-          state = DECOMPRESS_ORIGINAL;
-        } else {
-          state = DECOMPRESS_START;
-        }
-        remainingLength = header >> 1;
-      } else {
-        remainingLength = 0;
-      }
-    }
+    std::string getStreamName() const;
+    void readBuffer(bool failOnEof);
+    uint32_t readByte(bool failOnEof);
+    void readHeader();
 
     MemoryPool& pool;
-    const size_t blockSize;
     std::unique_ptr<SeekableInputStream> input;
-    z_stream zstream;
-    DataBuffer<char> buffer;
+
+    // uncompressed output
+    DataBuffer<char> outputDataBuffer;
 
     // the current state
     DecompressState state;
@@ -393,65 +360,74 @@ DIAGNOSTIC_PUSH
     off_t bytesReturned;
   };
 
-DIAGNOSTIC_PUSH
+  DecompressionStream::DecompressionStream(
+      std::unique_ptr<SeekableInputStream> inStream,
+      size_t bufferSize,
+      MemoryPool& _pool
+      ) : pool(_pool),
+          input(std::move(inStream)),
+          outputDataBuffer(pool, bufferSize),
+          state(DECOMPRESS_HEADER),
+          outputBuffer(nullptr),
+          outputBufferLength(0),
+          remainingLength(0),
+          inputBuffer(nullptr),
+          inputBufferEnd(nullptr),
+          bytesReturned(0)  {
+  }
 
-#if defined(__GNUC__) || defined(__clang__)
-  DIAGNOSTIC_IGNORE("-Wold-style-cast")
-#endif
+  std::string DecompressionStream::getStreamName() const {
+    return input->getName();
+  }
 
-  ZlibDecompressionStream::ZlibDecompressionStream
-                   (std::unique_ptr<SeekableInputStream> inStream,
-                    size_t _blockSize,
-                    MemoryPool& _pool
-                    ): pool(_pool),
-                       blockSize(_blockSize),
-                       input(std::move(inStream)),
-                       buffer(pool, _blockSize) {
-    zstream.next_in = nullptr;
-    zstream.avail_in = 0;
-    zstream.zalloc = nullptr;
-    zstream.zfree = nullptr;
-    zstream.opaque = nullptr;
-    zstream.next_out = reinterpret_cast<Bytef*>(buffer.data());
-    zstream.avail_out = static_cast<uInt>(blockSize);
-    int64_t result = inflateInit2(&zstream, -15);
-    switch (result) {
-    case Z_OK:
-      break;
-    case Z_MEM_ERROR:
-      throw std::logic_error("Memory error from inflateInit2");
-    case Z_VERSION_ERROR:
-      throw std::logic_error("Version error from inflateInit2");
-    case Z_STREAM_ERROR:
-      throw std::logic_error("Stream error from inflateInit2");
-    default:
-      throw std::logic_error("Unknown error from inflateInit2");
+  void DecompressionStream::readBuffer(bool failOnEof) {
+    int length;
+    if (!input->Next(reinterpret_cast<const void**>(&inputBuffer),
+                      &length)) {
+      if (failOnEof) {
+        throw ParseError("Read past EOF in DecompressionStream::readBuffer");
+      }
+      state = DECOMPRESS_EOF;
+      inputBuffer = nullptr;
+      inputBufferEnd = nullptr;
+    } else {
+      inputBufferEnd = inputBuffer + length;
     }
-    outputBuffer = nullptr;
-    outputBufferLength = 0;
-    remainingLength = 0;
-    state = DECOMPRESS_HEADER;
-    inputBuffer = nullptr;
-    inputBufferEnd = nullptr;
-    bytesReturned = 0;
   }
 
-DIAGNOSTIC_POP
+  uint32_t DecompressionStream::readByte(bool failOnEof) {
+    if (inputBuffer == inputBufferEnd) {
+      readBuffer(failOnEof);
+      if (state == DECOMPRESS_EOF) {
+        return 0;
+      }
+    }
+    return static_cast<unsigned char>(*(inputBuffer++));
+  }
 
-  ZlibDecompressionStream::~ZlibDecompressionStream() {
-    int64_t result = inflateEnd(&zstream);
-    if (result != Z_OK) {
-      // really can't throw in destructors
-      std::cout << "Error in ~ZlibDecompressionStream() " << result << "\n";
+  void DecompressionStream::readHeader() {
+    uint32_t header = readByte(false);
+    if (state != DECOMPRESS_EOF) {
+      header |= readByte(true) << 8;
+      header |= readByte(true) << 16;
+      if (header & 1) {
+        state = DECOMPRESS_ORIGINAL;
+      } else {
+        state = DECOMPRESS_START;
+      }
+      remainingLength = header >> 1;
+    } else {
+      remainingLength = 0;
     }
   }
 
-  bool ZlibDecompressionStream::Next(const void** data, int*size) {
+  bool DecompressionStream::Next(const void** data, int*size) {
     // if the user pushed back, return them the partial buffer
     if (outputBufferLength) {
       *data = outputBuffer;
       *size = static_cast<int>(outputBufferLength);
       outputBuffer += outputBufferLength;
+      bytesReturned += static_cast<off_t>(outputBufferLength);
       outputBufferLength = 0;
       return true;
     }
@@ -464,83 +440,40 @@ DIAGNOSTIC_POP
     if (inputBuffer == inputBufferEnd) {
       readBuffer(true);
     }
-    size_t availSize =
+    size_t availableSize =
       std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
                remainingLength);
     if (state == DECOMPRESS_ORIGINAL) {
       *data = inputBuffer;
-      *size = static_cast<int>(availSize);
-      outputBuffer = inputBuffer + availSize;
+      *size = static_cast<int>(availableSize);
+      outputBuffer = inputBuffer + availableSize;
       outputBufferLength = 0;
+      inputBuffer += availableSize;
+      remainingLength -= availableSize;
     } else if (state == DECOMPRESS_START) {
-      zstream.next_in =
-        reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
-      zstream.avail_in = static_cast<uInt>(availSize);
-      outputBuffer = buffer.data();
-      zstream.next_out =
-        reinterpret_cast<Bytef*>(const_cast<char*>(outputBuffer));
-      zstream.avail_out = static_cast<uInt>(blockSize);
-      if (inflateReset(&zstream) != Z_OK) {
-        throw std::logic_error("Bad inflateReset in "
-                               "ZlibDecompressionStream::Next");
-      }
-      int64_t result;
-      do {
-        result = inflate(&zstream, availSize == remainingLength ? Z_FINISH :
-                         Z_SYNC_FLUSH);
-        switch (result) {
-        case Z_OK:
-          remainingLength -= availSize;
-          inputBuffer += availSize;
-          readBuffer(true);
-          availSize =
-            std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
-                     remainingLength);
-          zstream.next_in =
-            reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
-          zstream.avail_in = static_cast<uInt>(availSize);
-          break;
-        case Z_STREAM_END:
-          break;
-        case Z_BUF_ERROR:
-          throw std::logic_error("Buffer error in "
-                                 "ZlibDecompressionStream::Next");
-        case Z_DATA_ERROR:
-          throw std::logic_error("Data error in "
-                                 "ZlibDecompressionStream::Next");
-        case Z_STREAM_ERROR:
-          throw std::logic_error("Stream error in "
-                                 "ZlibDecompressionStream::Next");
-        default:
-          throw std::logic_error("Unknown error in "
-                                 "ZlibDecompressionStream::Next");
-        }
-      } while (result != Z_STREAM_END);
-      *size = static_cast<int>(blockSize - zstream.avail_out);
-      *data = outputBuffer;
-      outputBufferLength = 0;
-      outputBuffer += *size;
+      NextDecompress(data, size, availableSize);
     } else {
       throw std::logic_error("Unknown compression state in "
-                             "ZlibDecompressionStream::Next");
+                             "DecompressionStream::Next");
     }
-    inputBuffer += availSize;
-    remainingLength -= availSize;
     bytesReturned += *size;
     return true;
   }
 
-  void ZlibDecompressionStream::BackUp(int count) {
+  void DecompressionStream::BackUp(int count) {
     if (outputBuffer == nullptr || outputBufferLength != 0) {
-      throw std::logic_error("Backup without previous Next in "
-                             "ZlibDecompressionStream");
+      throw std::logic_error("Backup without previous Next in " + getName());
     }
     outputBuffer -= static_cast<size_t>(count);
     outputBufferLength = static_cast<size_t>(count);
     bytesReturned -= count;
   }
 
-  bool ZlibDecompressionStream::Skip(int count) {
+  int64_t DecompressionStream::ByteCount() const {
+    return bytesReturned;
+  }
+
+  bool DecompressionStream::Skip(int count) {
     bytesReturned += count;
     // this is a stupid implementation for now.
     // should skip entire blocks without decompressing
@@ -560,11 +493,7 @@ DIAGNOSTIC_POP
     return true;
   }
 
-  int64_t ZlibDecompressionStream::ByteCount() const {
-    return bytesReturned;
-  }
-
-  void ZlibDecompressionStream::seek(PositionProvider& position) {
+  void DecompressionStream::seek(PositionProvider& position) {
     // clear state to force seek to read from the right position
     state = DECOMPRESS_HEADER;
     outputBuffer = nullptr;
@@ -576,255 +505,207 @@ DIAGNOSTIC_POP
     input->seek(position);
     bytesReturned = static_cast<off_t>(input->ByteCount());
     if (!Skip(static_cast<int>(position.next()))) {
-      throw ParseError("Bad skip in ZlibDecompressionStream::seek");
+      throw ParseError("Bad skip in " + getName());
     }
   }
 
+  class ZlibDecompressionStream : public DecompressionStream {
+  public:
+    ZlibDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
+                            size_t blockSize,
+                            MemoryPool& pool);
+    virtual ~ZlibDecompressionStream() override;
+    virtual std::string getName() const override;
+
+  protected:
+    virtual void NextDecompress(const void** data,
+                                int* size,
+                                size_t availableSize) override;
+  private:
+    z_stream zstream;
+  };
+
+DIAGNOSTIC_PUSH
+
+#if defined(__GNUC__) || defined(__clang__)
+  DIAGNOSTIC_IGNORE("-Wold-style-cast")
+#endif
+
+  ZlibDecompressionStream::ZlibDecompressionStream
+                   (std::unique_ptr<SeekableInputStream> inStream,
+                    size_t bufferSize,
+                    MemoryPool& _pool
+                    ): DecompressionStream
+                          (std::move(inStream), bufferSize, _pool) {
+    zstream.next_in = nullptr;
+    zstream.avail_in = 0;
+    zstream.zalloc = nullptr;
+    zstream.zfree = nullptr;
+    zstream.opaque = nullptr;
+    zstream.next_out = reinterpret_cast<Bytef*>(outputDataBuffer.data());
+    zstream.avail_out = static_cast<uInt>(outputDataBuffer.capacity());
+    int64_t result = inflateInit2(&zstream, -15);
+    switch (result) {
+    case Z_OK:
+      break;
+    case Z_MEM_ERROR:
+      throw std::logic_error("Memory error from inflateInit2");
+    case Z_VERSION_ERROR:
+      throw std::logic_error("Version error from inflateInit2");
+    case Z_STREAM_ERROR:
+      throw std::logic_error("Stream error from inflateInit2");
+    default:
+      throw std::logic_error("Unknown error from inflateInit2");
+    }
+  }
+
+DIAGNOSTIC_POP
+
+  ZlibDecompressionStream::~ZlibDecompressionStream() {
+    int64_t result = inflateEnd(&zstream);
+    if (result != Z_OK) {
+      // really can't throw in destructors
+      std::cout << "Error in ~ZlibDecompressionStream() " << result << "\n";
+    }
+  }
+
+  void ZlibDecompressionStream::NextDecompress(const void** data, int* size,
+      size_t availableSize) {
+    zstream.next_in =
+      reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
+    zstream.avail_in = static_cast<uInt>(availableSize);
+    outputBuffer = outputDataBuffer.data();
+    zstream.next_out =
+      reinterpret_cast<Bytef*>(const_cast<char*>(outputBuffer));
+    zstream.avail_out = static_cast<uInt>(outputDataBuffer.capacity());
+    if (inflateReset(&zstream) != Z_OK) {
+      throw std::logic_error("Bad inflateReset in "
+                              "ZlibDecompressionStream::NextDecompress");
+    }
+    int64_t result;
+    do {
+      result = inflate(&zstream, availableSize == remainingLength ? Z_FINISH :
+                        Z_SYNC_FLUSH);
+      switch (result) {
+      case Z_OK:
+        remainingLength -= availableSize;
+        inputBuffer += availableSize;
+        readBuffer(true);
+        availableSize =
+          std::min(static_cast<size_t>(inputBufferEnd - inputBuffer),
+                    remainingLength);
+        zstream.next_in =
+          reinterpret_cast<Bytef*>(const_cast<char*>(inputBuffer));
+        zstream.avail_in = static_cast<uInt>(availableSize);
+        break;
+      case Z_STREAM_END:
+        break;
+      case Z_BUF_ERROR:
+        throw std::logic_error("Buffer error in "
+                                "ZlibDecompressionStream::NextDecompress");
+      case Z_DATA_ERROR:
+        throw std::logic_error("Data error in "
+                                "ZlibDecompressionStream::NextDecompress");
+      case Z_STREAM_ERROR:
+        throw std::logic_error("Stream error in "
+                                "ZlibDecompressionStream::NextDecompress");
+      default:
+        throw std::logic_error("Unknown error in "
+                                "ZlibDecompressionStream::NextDecompress");
+      }
+    } while (result != Z_STREAM_END);
+    *size = static_cast<int>(outputDataBuffer.capacity() - zstream.avail_out);
+    *data = outputBuffer;
+    outputBufferLength = 0;
+    outputBuffer += *size;
+    inputBuffer += availableSize;
+    remainingLength -= availableSize;
+  }
+
   std::string ZlibDecompressionStream::getName() const {
     std::ostringstream result;
     result << "zlib(" << input->getName() << ")";
     return result.str();
   }
 
-  class BlockDecompressionStream: public SeekableInputStream {
+  class BlockDecompressionStream: public DecompressionStream {
   public:
     BlockDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                              size_t blockSize,
                              MemoryPool& pool);
 
     virtual ~BlockDecompressionStream() override {}
-    virtual bool Next(const void** data, int*size) override;
-    virtual void BackUp(int count) override;
-    virtual bool Skip(int count) override;
-    virtual int64_t ByteCount() const override;
-    virtual void seek(PositionProvider& position) override;
     virtual std::string getName() const override = 0;
 
   protected:
+    virtual void NextDecompress(const void** data,
+                                int* size,
+                                size_t availableSize) override;
+
     virtual uint64_t decompress(const char *input, uint64_t length,
                                 char *output, size_t maxOutputLength) = 0;
-
-    std::string getStreamName() const {
-      return input->getName();
-    }
-
   private:
-    void readBuffer(bool failOnEof) {
-      int length;
-      if (!input->Next(reinterpret_cast<const void**>(&inputBufferPtr),
-                       &length)) {
-        if (failOnEof) {
-          throw ParseError(getName() + "read past EOF");
-        }
-        state = DECOMPRESS_EOF;
-        inputBufferPtr = nullptr;
-        inputBufferPtrEnd = nullptr;
-      } else {
-        inputBufferPtrEnd = inputBufferPtr + length;
-      }
-    }
-
-    uint32_t readByte(bool failOnEof) {
-      if (inputBufferPtr == inputBufferPtrEnd) {
-        readBuffer(failOnEof);
-        if (state == DECOMPRESS_EOF) {
-          return 0;
-        }
-      }
-      return static_cast<unsigned char>(*(inputBufferPtr++));
-    }
-
-    void readHeader() {
-      uint32_t header = readByte(false);
-      if (state != DECOMPRESS_EOF) {
-        header |= readByte(true) << 8;
-        header |= readByte(true) << 16;
-        if (header & 1) {
-          state = DECOMPRESS_ORIGINAL;
-        } else {
-          state = DECOMPRESS_START;
-        }
-        remainingLength = header >> 1;
-      } else {
-        remainingLength = 0;
-      }
-    }
-
-    std::unique_ptr<SeekableInputStream> input;
-    MemoryPool& pool;
-
     // may need to stitch together multiple input buffers;
     // to give snappy a contiguous block
-    DataBuffer<char> inputBuffer;
-
-    // uncompressed output
-    DataBuffer<char> outputBuffer;
-
-    // the current state
-    DecompressState state;
-
-    // the start of the current output buffer
-    const char* outputBufferPtr;
-    // the size of the current output buffer
-    size_t outputBufferLength;
-
-    // the size of the current chunk
-    size_t remainingLength;
-
-    // the last buffer returned from the input
-    const char *inputBufferPtr;
-    const char *inputBufferPtrEnd;
-
-    // bytes returned by this stream
-    off_t bytesReturned;
+    DataBuffer<char> inputDataBuffer;
   };
 
   BlockDecompressionStream::BlockDecompressionStream
                    (std::unique_ptr<SeekableInputStream> inStream,
-                    size_t bufferSize,
+                    size_t blockSize,
                     MemoryPool& _pool
-                    ) : input(std::move(inStream)),
-                        pool(_pool),
-                        inputBuffer(pool, bufferSize),
-                        outputBuffer(pool, bufferSize),
-                        state(DECOMPRESS_HEADER),
-                        outputBufferPtr(nullptr),
-                        outputBufferLength(0),
-                        remainingLength(0),
-                        inputBufferPtr(nullptr),
-                        inputBufferPtrEnd(nullptr),
-                        bytesReturned(0) {
-  }
-
-  bool BlockDecompressionStream::Next(const void** data, int*size) {
-    // if the user pushed back, return them the partial buffer
-    if (outputBufferLength) {
-      *data = outputBufferPtr;
-      *size = static_cast<int>(outputBufferLength);
-      outputBufferPtr += outputBufferLength;
-      bytesReturned += static_cast<off_t>(outputBufferLength);
-      outputBufferLength = 0;
-      return true;
-    }
-    if (state == DECOMPRESS_HEADER || remainingLength == 0) {
-      readHeader();
-    }
-    if (state == DECOMPRESS_EOF) {
-      return false;
-    }
-    if (inputBufferPtr == inputBufferPtrEnd) {
-      readBuffer(true);
-    }
-
-    size_t availSize =
-      std::min(static_cast<size_t>(inputBufferPtrEnd - inputBufferPtr),
-               remainingLength);
-    if (state == DECOMPRESS_ORIGINAL) {
-      *data = inputBufferPtr;
-      *size = static_cast<int>(availSize);
-      outputBufferPtr = inputBufferPtr + availSize;
-      outputBufferLength = 0;
-      inputBufferPtr += availSize;
-      remainingLength -= availSize;
-    } else if (state == DECOMPRESS_START) {
-      // Get contiguous bytes of compressed block.
-      const char *compressed = inputBufferPtr;
-      if (remainingLength == availSize) {
-          inputBufferPtr += availSize;
-      } else {
-        // Did not read enough from input.
-        if (inputBuffer.capacity() < remainingLength) {
-          inputBuffer.resize(remainingLength);
-        }
-        ::memcpy(inputBuffer.data(), inputBufferPtr, availSize);
-        inputBufferPtr += availSize;
-        compressed = inputBuffer.data();
-
-        for (size_t pos = availSize; pos < remainingLength; ) {
-          readBuffer(true);
-          size_t avail =
-              std::min(static_cast<size_t>(inputBufferPtrEnd -
-                                           inputBufferPtr),
-                       remainingLength - pos);
-          ::memcpy(inputBuffer.data() + pos, inputBufferPtr, avail);
-          pos += avail;
-          inputBufferPtr += avail;
-        }
-      }
-
-      outputBufferLength = decompress(compressed, remainingLength,
-                                      outputBuffer.data(),
-                                      outputBuffer.capacity());
-
-      remainingLength = 0;
-      state = DECOMPRESS_HEADER;
-      *data = outputBuffer.data();
-      *size = static_cast<int>(outputBufferLength);
-      outputBufferPtr = outputBuffer.data() + outputBufferLength;
-      outputBufferLength = 0;
-    }
-
-    bytesReturned += *size;
-    return true;
+                    ) : DecompressionStream
+                            (std::move(inStream), blockSize, _pool),
+                        inputDataBuffer(pool, blockSize) {
   }
 
-  void BlockDecompressionStream::BackUp(int count) {
-    if (outputBufferPtr == nullptr || outputBufferLength != 0) {
-      throw std::logic_error("Backup without previous Next in "+getName());
-    }
-    outputBufferPtr -= static_cast<size_t>(count);
-    outputBufferLength = static_cast<size_t>(count);
-    bytesReturned -= count;
-  }
 
-  bool BlockDecompressionStream::Skip(int count) {
-    bytesReturned += count;
-    // this is a stupid implementation for now.
-    // should skip entire blocks without decompressing
-    while (count > 0) {
-      const void *ptr;
-      int len;
-      if (!Next(&ptr, &len)) {
-        return false;
+  void BlockDecompressionStream::NextDecompress(const void** data, int* size,
+      size_t availableSize) {
+    // Get contiguous bytes of compressed block.
+    const char *compressed = inputBuffer;
+    if (remainingLength == availableSize) {
+        inputBuffer += availableSize;
+    } else {
+      // Did not read enough from input.
+      if (inputDataBuffer.capacity() < remainingLength) {
+        inputDataBuffer.resize(remainingLength);
       }
-      if (len > count) {
-        BackUp(len - count);
-        count = 0;
-      } else {
-        count -= len;
+      ::memcpy(inputDataBuffer.data(), inputBuffer, availableSize);
+      inputBuffer += availableSize;
+      compressed = inputDataBuffer.data();
+
+      for (size_t pos = availableSize; pos < remainingLength; ) {
+        readBuffer(true);
+        size_t avail =
+            std::min(static_cast<size_t>(inputBufferEnd -
+                                          inputBuffer),
+                      remainingLength - pos);
+        ::memcpy(inputDataBuffer.data() + pos, inputBuffer, avail);
+        pos += avail;
+        inputBuffer += avail;
       }
     }
-    return true;
-  }
-
-  int64_t BlockDecompressionStream::ByteCount() const {
-    return bytesReturned;
-  }
-
-  void BlockDecompressionStream::seek(PositionProvider& position) {
-    // clear state to force seek to read from the right position
+    outputBufferLength = decompress(compressed, remainingLength,
+                                    outputDataBuffer.data(),
+                                    outputDataBuffer.capacity());
+    remainingLength = 0;
     state = DECOMPRESS_HEADER;
-    outputBufferPtr = nullptr;
+    *data = outputDataBuffer.data();
+    *size = static_cast<int>(outputBufferLength);
+    outputBuffer = outputDataBuffer.data() + outputBufferLength;
     outputBufferLength = 0;
-    remainingLength = 0;
-    inputBufferPtr = nullptr;
-    inputBufferPtrEnd = nullptr;
-
-    input->seek(position);
-    if (!Skip(static_cast<int>(position.next()))) {
-      throw ParseError("Bad skip in " + getName());
-    }
   }
 
   class SnappyDecompressionStream: public BlockDecompressionStream {
   public:
     SnappyDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                               size_t blockSize,
-                              MemoryPool& pool
+                              MemoryPool& _pool
                               ): BlockDecompressionStream
                                  (std::move(inStream),
                                   blockSize,
-                                  pool) {
+                                  _pool) {
       // PASS
     }
 
@@ -840,12 +721,12 @@ DIAGNOSTIC_POP
                                 ) override;
   };
 
-  uint64_t SnappyDecompressionStream::decompress(const char *input,
+  uint64_t SnappyDecompressionStream::decompress(const char *_input,
                                                  uint64_t length,
                                                  char *output,
                                                  size_t maxOutputLength) {
     size_t outLength;
-    if (!snappy::GetUncompressedLength(input, length, &outLength)) {
+    if (!snappy::GetUncompressedLength(_input, length, &outLength)) {
       throw ParseError("SnappyDecompressionStream choked on corrupt input");
     }
 
@@ -853,7 +734,7 @@ DIAGNOSTIC_POP
       throw std::logic_error("Snappy length exceeds block size");
     }
 
-    if (!snappy::RawUncompress(input, length, output)) {
+    if (!snappy::RawUncompress(_input, length, output)) {
       throw ParseError("SnappyDecompressionStream choked on corrupt input");
     }
     return outLength;
@@ -863,11 +744,11 @@ DIAGNOSTIC_POP
   public:
     LzoDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                            size_t blockSize,
-                           MemoryPool& pool
+                           MemoryPool& _pool
                            ): BlockDecompressionStream
-                              (std::move(inStream),
-                               blockSize,
-                               pool) {
+                                  (std::move(inStream),
+                                   blockSize,
+                                   _pool) {
       // PASS
     }
 
@@ -883,11 +764,11 @@ DIAGNOSTIC_POP
                                 ) override;
   };
 
-  uint64_t LzoDecompressionStream::decompress(const char *input,
+  uint64_t LzoDecompressionStream::decompress(const char *inputPtr,
                                               uint64_t length,
                                               char *output,
                                               size_t maxOutputLength) {
-    return lzoDecompress(input, input + length, output,
+    return lzoDecompress(inputPtr, inputPtr + length, output,
                          output + maxOutputLength);
   }
 
@@ -895,11 +776,11 @@ DIAGNOSTIC_POP
   public:
     Lz4DecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                            size_t blockSize,
-                           MemoryPool& pool
+                           MemoryPool& _pool
                            ): BlockDecompressionStream
                               (std::move(inStream),
                                blockSize,
-                               pool) {
+                               _pool) {
       // PASS
     }
 
@@ -915,11 +796,11 @@ DIAGNOSTIC_POP
                                 ) override;
   };
 
-  uint64_t Lz4DecompressionStream::decompress(const char *input,
+  uint64_t Lz4DecompressionStream::decompress(const char *inputPtr,
                                               uint64_t length,
                                               char *output,
                                               size_t maxOutputLength) {
-    int result = LZ4_decompress_safe(input, output, static_cast<int>(length),
+    int result = LZ4_decompress_safe(inputPtr, output, static_cast<int>(length),
                                      static_cast<int>(maxOutputLength));
     if (result < 0) {
       throw ParseError(getName() + " - failed to decompress");
@@ -1060,10 +941,10 @@ DIAGNOSTIC_POP
   public:
     ZSTDDecompressionStream(std::unique_ptr<SeekableInputStream> inStream,
                             size_t blockSize,
-                            MemoryPool& pool)
+                            MemoryPool& _pool)
                             : BlockDecompressionStream(std::move(inStream),
                                                        blockSize,
-                                                       pool) {
+                                                       _pool) {
       // PASS
     }
 
@@ -1080,13 +961,13 @@ DIAGNOSTIC_POP
                                 size_t maxOutputLength) override;
   };
 
-  uint64_t ZSTDDecompressionStream::decompress(const char *input,
+  uint64_t ZSTDDecompressionStream::decompress(const char *inputPtr,
                                                uint64_t length,
                                                char *output,
                                                size_t maxOutputLength) {
     return static_cast<uint64_t>(ZSTD_decompress(output,
                                                  maxOutputLength,
-                                                 input,
+                                                 inputPtr,
                                                  length));
   }