You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@orc.apache.org by xndai <gi...@git.apache.org> on 2017/05/16 22:33:49 UTC

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

GitHub user xndai opened a pull request:

    https://github.com/apache/orc/pull/122

    ORC-192 Implement zlib compresion stream

    Implement zlib compressor based on the output stream. Add corresponding
    UTs. Also rename the existing test suite from TestCompression into
    TestDecompression.
    
    Change-Id: I6c594cc8f8a769078bbe561056adfd5d0389dbc1

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xndai/orc dev_compression

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/orc/pull/122.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #122
    
----
commit d4fcfe4a14386041f3080dfad95036c7fe5f8e9a
Author: Xiening.Dai <xi...@alibaba-inc.com>
Date:   2017-05-16T22:29:01Z

    ORC-192 Implement zlib compresion stream
    
    Implement zlib compressor based on the output stream. Add corresponding
    UTs. Also rename the existing test suite from TestCompression into
    TestDecompression.
    
    Change-Id: I6c594cc8f8a769078bbe561056adfd5d0389dbc1

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117355066
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    use `ParseError` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117377956
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    Alternatively, a better option is to rename the `ParseError` to `ORCError`, and use it throughout the code base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117622782
  
    --- Diff: c++/src/Compression.cc ---
    @@ -636,6 +884,33 @@ DIAGNOSTIC_POP
         return static_cast<uint64_t>(result);
       }
     
    +  std::unique_ptr<BufferedOutputStream>
    +     createCompressor(
    +                      CompressionKind kind,
    +                      OutputStream * outStream,
    +                      CompressionStrategy strategy,
    +                      uint64_t bufferCapacity,
    +                      uint64_t blockSize,
    +                      MemoryPool& pool) {
    +    switch (static_cast<int64_t>(kind)) {
    +    case CompressionKind_NONE: {
    +      return std::unique_ptr<BufferedOutputStream>
    +        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
    +    }
    +    case CompressionKind_ZLIB: {
    +      int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9;
    --- End diff --
    
    what are the corresponding levels for these three settings? Are they documented anywhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117631322
  
    --- Diff: c++/src/Compression.cc ---
    @@ -636,6 +884,33 @@ DIAGNOSTIC_POP
         return static_cast<uint64_t>(result);
       }
     
    +  std::unique_ptr<BufferedOutputStream>
    +     createCompressor(
    +                      CompressionKind kind,
    +                      OutputStream * outStream,
    +                      CompressionStrategy strategy,
    +                      uint64_t bufferCapacity,
    +                      uint64_t blockSize,
    +                      MemoryPool& pool) {
    +    switch (static_cast<int64_t>(kind)) {
    +    case CompressionKind_NONE: {
    +      return std::unique_ptr<BufferedOutputStream>
    +        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
    +    }
    +    case CompressionKind_ZLIB: {
    +      int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9;
    --- End diff --
    
    You can see the compression levels here
    http://www.zlib.net/manual.html
    ```
    #define Z_NO_COMPRESSION         0
    #define Z_BEST_SPEED             1
    #define Z_BEST_COMPRESSION       9
    #define Z_DEFAULT_COMPRESSION  (-1)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117822601
  
    --- Diff: c++/src/Compression.cc ---
    @@ -636,6 +884,33 @@ DIAGNOSTIC_POP
         return static_cast<uint64_t>(result);
       }
     
    +  std::unique_ptr<BufferedOutputStream>
    +     createCompressor(
    +                      CompressionKind kind,
    +                      OutputStream * outStream,
    +                      CompressionStrategy strategy,
    +                      uint64_t bufferCapacity,
    +                      uint64_t blockSize,
    +                      MemoryPool& pool) {
    +    switch (static_cast<int64_t>(kind)) {
    +    case CompressionKind_NONE: {
    +      return std::unique_ptr<BufferedOutputStream>
    +        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
    +    }
    +    case CompressionKind_ZLIB: {
    +      int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9;
    --- End diff --
    
    According to this - https://orc.apache.org/docs/hive-config.html, there are only two compression strategy defined: SPEED and COMPRESSION. I also checked Java implementation, SPEED maps to zlib level Z_BEST_SPEED + 1, and COMPRESSION maps to Z_DEFAULT_COMPRESSION. I will do the same for C++.
    
    Java implementation for your reference -
    
    WriterImpl.java
    
    `
        CompressionCodec result = physicalWriter.getCompressionCodec();
        if (result != null) {
          switch (kind) {
            case BLOOM_FILTER:
            case DATA:
            case DICTIONARY_DATA:
            case BLOOM_FILTER_UTF8:
              if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) {
                result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST,
                    CompressionCodec.Modifier.TEXT));
              } else {
                result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT,
                    CompressionCodec.Modifier.TEXT));
              }
              break;
            case LENGTH:
            case DICTIONARY_COUNT:
            case PRESENT:
            case ROW_INDEX:
            case SECONDARY:
              // easily compressed using the fastest modes
              result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST,
                  CompressionCodec.Modifier.BINARY));
              break;
            default:
              LOG.info("Missing ORC compression modifiers for " + kind);
              break;
          }
        }
    
    `
    
    ZlibCodec.java
    
    `
    public CompressionCodec modify(/* @Nullable */ EnumSet<Modifier> modifiers) {
    
        if (modifiers == null) {
          return this;
        }
    
        int l = this.level;
        int s = this.strategy;
    
        for (Modifier m : modifiers) {
          switch (m) {
          case BINARY:
            /* filtered == less LZ77, more huffman */
            s = Deflater.FILTERED;
            break;
          case TEXT:
            s = Deflater.DEFAULT_STRATEGY;
            break;
          case FASTEST:
            // deflate_fast looking for 8 byte patterns
            l = Deflater.BEST_SPEED;
            break;
          case FAST:
            // deflate_fast looking for 16 byte patterns
            l = Deflater.BEST_SPEED + 1;
            break;
          case DEFAULT:
            // deflate_slow looking for 128 byte patterns
            l = Deflater.DEFAULT_COMPRESSION;
            break;
          default:
            break;
          }
        }
        return new ZlibCodec(l, s);
      }
    `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117355282
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    +    }
    +    BufferedOutputStream::BackUp(outputSize - outputPosition);
    +    bufferSize = outputSize = outputPosition = 0;
    +    return BufferedOutputStream::flush();
    +  }
    +
    +  uint64_t CompressionStreamBase::getSize() const {
    +    return BufferedOutputStream::getSize() -
    +           static_cast<uint64_t>(outputSize - outputPosition);
    +  }
    +
    +  /**
    +   * Streaming compression base class
    +   */
    +  class CompressionStream: public CompressionStreamBase {
    +  public:
    +    CompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override;
    +    virtual std::string getName() const override = 0;
    +
    +  protected:
    +    // return total compressed size
    +    virtual uint64_t doStreamingCompression() = 0;
    +  };
    +
    +  CompressionStream::CompressionStream(OutputStream * outStream,
    +                                       int compressionLevel,
    +                                       uint64_t capacity,
    +                                       uint64_t blockSize,
    +                                       MemoryPool& pool) :
    +                                         CompressionStreamBase(outStream,
    +                                                               compressionLevel,
    +                                                               capacity,
    +                                                               blockSize,
    +                                                               pool) {
    +    // PASS
    +  }
    +
    +  bool CompressionStream::Next(void** data, int*size) {
    +    if (bufferSize != 0) {
    +      // adjust 3 bytes for the compression header
    +      if (outputPosition + 3 >= outputSize) {
    +        int newPosition = outputPosition + 3 - outputSize;
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = newPosition;
    +      } else {
    +        outputPosition += 3;
    +      }
    +
    +      uint64_t totalCompressedSize = doStreamingCompression();
    +
    +      char * header = outputBuffer + outputPosition - totalCompressedSize - 3;
    +      if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
    +        writeHeader(header, static_cast<size_t>(bufferSize), true);
    +        memcpy(
    +          header + 3,
    +          rawInputBuffer.data(),
    +          static_cast<size_t>(bufferSize));
    +
    +        int backup = static_cast<int>(totalCompressedSize) - bufferSize;
    +        BufferedOutputStream::BackUp(backup);
    +        outputPosition -= backup;
    +        outputSize -= backup;
    +      } else {
    +        writeHeader(header, totalCompressedSize, false);
    +      }
    +    }
    +
    +    *data = rawInputBuffer.data();
    +    *size = static_cast<int>(rawInputBuffer.size());
    +    bufferSize = *size;
    +
    +    return true;
    +  }
    +
    +  class ZlibCompressionStream: public CompressionStream {
    +  public:
    +    ZlibCompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual std::string getName() const override;
    +
    +  protected:
    +    virtual uint64_t doStreamingCompression() override;
    +
    +  private:
    +    void init();
    +    z_stream strm;
    +  };
    +
    +  ZlibCompressionStream::ZlibCompressionStream(
    +                        OutputStream * outStream,
    +                        int compressionLevel,
    +                        uint64_t capacity,
    +                        uint64_t blockSize,
    +                        MemoryPool& pool)
    +                        : CompressionStream(outStream,
    +                                            compressionLevel,
    +                                            capacity,
    +                                            blockSize,
    +                                            pool) {
    +    init();
    +  }
    +
    +  uint64_t ZlibCompressionStream::doStreamingCompression() {
    +    if (deflateReset(&strm) != Z_OK) {
    +      throw std::logic_error("Failed to reset inflate.");
    +    }
    +
    +    strm.avail_in = static_cast<unsigned int>(bufferSize);
    +    strm.next_in = rawInputBuffer.data();
    +
    +    do {
    +      if (outputPosition >= outputSize) {
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = 0;
    +      }
    +      strm.next_out = reinterpret_cast<unsigned char *>
    +      (outputBuffer + outputPosition);
    +      strm.avail_out = static_cast<unsigned int>
    +      (outputSize - outputPosition);
    +
    +      int ret = deflate(&strm, Z_FINISH);
    +      outputPosition = outputSize - static_cast<int>(strm.avail_out);
    +
    +      if (ret == Z_STREAM_END) {
    +        break;
    +      } else if (ret == Z_OK) {
    +        // needs more buffer so will continue the loop
    +      } else {
    +        throw std::logic_error("Failed to deflate input data.");
    --- End diff --
    
    use `ParseError` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc issue #122: ORC-192 Implement zlib compresion stream

Posted by prasanthj <gi...@git.apache.org>.
Github user prasanthj commented on the issue:

    https://github.com/apache/orc/pull/122
  
    I just merged this commit. Thanks @xndai for the contribution and @majetideepak  for the reviews!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117379895
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    We also throw std::logic_error in other places. If that's the goal, I'd suggest we use one single type of exception for all Orc code, e.g. OrcException. You can open a JIRA on this, and it can be done in a separate change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc issue #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on the issue:

    https://github.com/apache/orc/pull/122
  
    @omalley can you please take a look at this? Thx.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117355319
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    +    }
    +    BufferedOutputStream::BackUp(outputSize - outputPosition);
    +    bufferSize = outputSize = outputPosition = 0;
    +    return BufferedOutputStream::flush();
    +  }
    +
    +  uint64_t CompressionStreamBase::getSize() const {
    +    return BufferedOutputStream::getSize() -
    +           static_cast<uint64_t>(outputSize - outputPosition);
    +  }
    +
    +  /**
    +   * Streaming compression base class
    +   */
    +  class CompressionStream: public CompressionStreamBase {
    +  public:
    +    CompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override;
    +    virtual std::string getName() const override = 0;
    +
    +  protected:
    +    // return total compressed size
    +    virtual uint64_t doStreamingCompression() = 0;
    +  };
    +
    +  CompressionStream::CompressionStream(OutputStream * outStream,
    +                                       int compressionLevel,
    +                                       uint64_t capacity,
    +                                       uint64_t blockSize,
    +                                       MemoryPool& pool) :
    +                                         CompressionStreamBase(outStream,
    +                                                               compressionLevel,
    +                                                               capacity,
    +                                                               blockSize,
    +                                                               pool) {
    +    // PASS
    +  }
    +
    +  bool CompressionStream::Next(void** data, int*size) {
    +    if (bufferSize != 0) {
    +      // adjust 3 bytes for the compression header
    +      if (outputPosition + 3 >= outputSize) {
    +        int newPosition = outputPosition + 3 - outputSize;
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = newPosition;
    +      } else {
    +        outputPosition += 3;
    +      }
    +
    +      uint64_t totalCompressedSize = doStreamingCompression();
    +
    +      char * header = outputBuffer + outputPosition - totalCompressedSize - 3;
    +      if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
    +        writeHeader(header, static_cast<size_t>(bufferSize), true);
    +        memcpy(
    +          header + 3,
    +          rawInputBuffer.data(),
    +          static_cast<size_t>(bufferSize));
    +
    +        int backup = static_cast<int>(totalCompressedSize) - bufferSize;
    +        BufferedOutputStream::BackUp(backup);
    +        outputPosition -= backup;
    +        outputSize -= backup;
    +      } else {
    +        writeHeader(header, totalCompressedSize, false);
    +      }
    +    }
    +
    +    *data = rawInputBuffer.data();
    +    *size = static_cast<int>(rawInputBuffer.size());
    +    bufferSize = *size;
    +
    +    return true;
    +  }
    +
    +  class ZlibCompressionStream: public CompressionStream {
    +  public:
    +    ZlibCompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual std::string getName() const override;
    +
    +  protected:
    +    virtual uint64_t doStreamingCompression() override;
    +
    +  private:
    +    void init();
    +    z_stream strm;
    +  };
    +
    +  ZlibCompressionStream::ZlibCompressionStream(
    +                        OutputStream * outStream,
    +                        int compressionLevel,
    +                        uint64_t capacity,
    +                        uint64_t blockSize,
    +                        MemoryPool& pool)
    +                        : CompressionStream(outStream,
    +                                            compressionLevel,
    +                                            capacity,
    +                                            blockSize,
    +                                            pool) {
    +    init();
    +  }
    +
    +  uint64_t ZlibCompressionStream::doStreamingCompression() {
    +    if (deflateReset(&strm) != Z_OK) {
    +      throw std::logic_error("Failed to reset inflate.");
    +    }
    +
    +    strm.avail_in = static_cast<unsigned int>(bufferSize);
    +    strm.next_in = rawInputBuffer.data();
    +
    +    do {
    +      if (outputPosition >= outputSize) {
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = 0;
    +      }
    +      strm.next_out = reinterpret_cast<unsigned char *>
    +      (outputBuffer + outputPosition);
    +      strm.avail_out = static_cast<unsigned int>
    +      (outputSize - outputPosition);
    +
    +      int ret = deflate(&strm, Z_FINISH);
    +      outputPosition = outputSize - static_cast<int>(strm.avail_out);
    +
    +      if (ret == Z_STREAM_END) {
    +        break;
    +      } else if (ret == Z_OK) {
    +        // needs more buffer so will continue the loop
    +      } else {
    +        throw std::logic_error("Failed to deflate input data.");
    +      }
    +    } while (strm.avail_out == 0);
    +
    +    return strm.total_out;
    +  }
    +
    +  std::string ZlibCompressionStream::getName() const {
    +    return "ZlibCompressionStream";
    +  }
    +
    +DIAGNOSTIC_PUSH
    +DIAGNOSTIC_IGNORE("-Wold-style-cast")
    +
    +  void ZlibCompressionStream::init() {
    +    strm.zalloc = Z_NULL;
    +    strm.zfree = Z_NULL;
    +    strm.opaque = Z_NULL;
    +
    +    if (deflateInit2(&strm, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY)
    +        != Z_OK) {
    +      throw std::logic_error("Error while calling deflateInit2() for zlib.");
    --- End diff --
    
    use `ParseError` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117378369
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    how about just using std::runtime_error? I still don't see a need to create yet another exception class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117379977
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    Sounds good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117609952
  
    --- Diff: c++/src/Compression.cc ---
    @@ -636,6 +884,33 @@ DIAGNOSTIC_POP
         return static_cast<uint64_t>(result);
       }
     
    +  std::unique_ptr<BufferedOutputStream>
    +     createCompressor(
    +                      CompressionKind kind,
    +                      OutputStream * outStream,
    +                      CompressionStrategy strategy,
    +                      uint64_t bufferCapacity,
    +                      uint64_t blockSize,
    +                      MemoryPool& pool) {
    +    switch (static_cast<int64_t>(kind)) {
    +    case CompressionKind_NONE: {
    +      return std::unique_ptr<BufferedOutputStream>
    +        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
    +    }
    +    case CompressionKind_ZLIB: {
    +      int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9;
    --- End diff --
    
    Compare the three options, `CompressionStrategy_DEFAULT`, `CompressionStrategy_SPEED`, `CompressionStrategy_COMPRESSION` and set the level accordingly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc issue #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on the issue:

    https://github.com/apache/orc/pull/122
  
    @majetideepak @omalley please take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117360126
  
    --- Diff: c++/src/Compression.cc ---
    @@ -636,6 +884,33 @@ DIAGNOSTIC_POP
         return static_cast<uint64_t>(result);
       }
     
    +  std::unique_ptr<BufferedOutputStream>
    +     createCompressor(
    +                      CompressionKind kind,
    +                      OutputStream * outStream,
    +                      CompressionStrategy strategy,
    +                      uint64_t bufferCapacity,
    +                      uint64_t blockSize,
    +                      MemoryPool& pool) {
    +    switch (static_cast<int64_t>(kind)) {
    +    case CompressionKind_NONE: {
    +      return std::unique_ptr<BufferedOutputStream>
    +        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
    +    }
    +    case CompressionKind_ZLIB: {
    +      int level = strategy == CompressionStrategy_SPEED ? 1 : 9;
    --- End diff --
    
    We should keep the default level to `Z_DEFAULT_COMPRESSION` (-1). Only if `strategy` is specified, we must set accordingly. I think the Java code does something similar.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/orc/pull/122


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117388912
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    yes! use std::runtime_error for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117825065
  
    --- Diff: c++/src/Compression.cc ---
    @@ -636,6 +884,33 @@ DIAGNOSTIC_POP
         return static_cast<uint64_t>(result);
       }
     
    +  std::unique_ptr<BufferedOutputStream>
    +     createCompressor(
    +                      CompressionKind kind,
    +                      OutputStream * outStream,
    +                      CompressionStrategy strategy,
    +                      uint64_t bufferCapacity,
    +                      uint64_t blockSize,
    +                      MemoryPool& pool) {
    +    switch (static_cast<int64_t>(kind)) {
    +    case CompressionKind_NONE: {
    +      return std::unique_ptr<BufferedOutputStream>
    +        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
    +    }
    +    case CompressionKind_ZLIB: {
    +      int level = (strategy == CompressionStrategy_SPEED) ? -1 : 9;
    --- End diff --
    
    You are right!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117380455
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    Filed ORC-196


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117354764
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    +    }
    +    BufferedOutputStream::BackUp(outputSize - outputPosition);
    +    bufferSize = outputSize = outputPosition = 0;
    +    return BufferedOutputStream::flush();
    +  }
    +
    +  uint64_t CompressionStreamBase::getSize() const {
    +    return BufferedOutputStream::getSize() -
    +           static_cast<uint64_t>(outputSize - outputPosition);
    +  }
    +
    +  /**
    +   * Streaming compression base class
    +   */
    +  class CompressionStream: public CompressionStreamBase {
    +  public:
    +    CompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override;
    +    virtual std::string getName() const override = 0;
    +
    +  protected:
    +    // return total compressed size
    +    virtual uint64_t doStreamingCompression() = 0;
    +  };
    +
    +  CompressionStream::CompressionStream(OutputStream * outStream,
    +                                       int compressionLevel,
    +                                       uint64_t capacity,
    +                                       uint64_t blockSize,
    +                                       MemoryPool& pool) :
    +                                         CompressionStreamBase(outStream,
    +                                                               compressionLevel,
    +                                                               capacity,
    +                                                               blockSize,
    +                                                               pool) {
    +    // PASS
    +  }
    +
    +  bool CompressionStream::Next(void** data, int*size) {
    +    if (bufferSize != 0) {
    +      // adjust 3 bytes for the compression header
    +      if (outputPosition + 3 >= outputSize) {
    +        int newPosition = outputPosition + 3 - outputSize;
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = newPosition;
    +      } else {
    +        outputPosition += 3;
    +      }
    +
    +      uint64_t totalCompressedSize = doStreamingCompression();
    +
    +      char * header = outputBuffer + outputPosition - totalCompressedSize - 3;
    +      if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
    +        writeHeader(header, static_cast<size_t>(bufferSize), true);
    +        memcpy(
    +          header + 3,
    +          rawInputBuffer.data(),
    +          static_cast<size_t>(bufferSize));
    +
    +        int backup = static_cast<int>(totalCompressedSize) - bufferSize;
    +        BufferedOutputStream::BackUp(backup);
    +        outputPosition -= backup;
    +        outputSize -= backup;
    +      } else {
    +        writeHeader(header, totalCompressedSize, false);
    +      }
    +    }
    +
    +    *data = rawInputBuffer.data();
    +    *size = static_cast<int>(rawInputBuffer.size());
    +    bufferSize = *size;
    +
    +    return true;
    +  }
    +
    +  class ZlibCompressionStream: public CompressionStream {
    +  public:
    +    ZlibCompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual std::string getName() const override;
    +
    +  protected:
    +    virtual uint64_t doStreamingCompression() override;
    +
    +  private:
    +    void init();
    +    z_stream strm;
    +  };
    +
    +  ZlibCompressionStream::ZlibCompressionStream(
    +                        OutputStream * outStream,
    +                        int compressionLevel,
    +                        uint64_t capacity,
    +                        uint64_t blockSize,
    +                        MemoryPool& pool)
    +                        : CompressionStream(outStream,
    +                                            compressionLevel,
    +                                            capacity,
    +                                            blockSize,
    +                                            pool) {
    +    init();
    +  }
    +
    +  uint64_t ZlibCompressionStream::doStreamingCompression() {
    +    if (deflateReset(&strm) != Z_OK) {
    +      throw std::logic_error("Failed to reset inflate.");
    --- End diff --
    
    The decompression logic uses `ParseError` for failed library calls.
    We should follow the same here. `ParseError(ZlibCompressionStream failed to reset deflate)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117381049
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    For now, I will just use std::runtime_error in compression path. We can come back and fix that later. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117354986
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    +    }
    +    BufferedOutputStream::BackUp(outputSize - outputPosition);
    +    bufferSize = outputSize = outputPosition = 0;
    +    return BufferedOutputStream::flush();
    +  }
    +
    +  uint64_t CompressionStreamBase::getSize() const {
    +    return BufferedOutputStream::getSize() -
    +           static_cast<uint64_t>(outputSize - outputPosition);
    +  }
    +
    +  /**
    +   * Streaming compression base class
    +   */
    +  class CompressionStream: public CompressionStreamBase {
    +  public:
    +    CompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override;
    +    virtual std::string getName() const override = 0;
    +
    +  protected:
    +    // return total compressed size
    +    virtual uint64_t doStreamingCompression() = 0;
    +  };
    +
    +  CompressionStream::CompressionStream(OutputStream * outStream,
    +                                       int compressionLevel,
    +                                       uint64_t capacity,
    +                                       uint64_t blockSize,
    +                                       MemoryPool& pool) :
    +                                         CompressionStreamBase(outStream,
    +                                                               compressionLevel,
    +                                                               capacity,
    +                                                               blockSize,
    +                                                               pool) {
    +    // PASS
    +  }
    +
    +  bool CompressionStream::Next(void** data, int*size) {
    +    if (bufferSize != 0) {
    +      // adjust 3 bytes for the compression header
    +      if (outputPosition + 3 >= outputSize) {
    +        int newPosition = outputPosition + 3 - outputSize;
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    --- End diff --
    
    Use `ParseError` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc issue #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on the issue:

    https://github.com/apache/orc/pull/122
  
    @xndai I should get rights to merge by next week. @omalley  will merge them until then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117363781
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    Throwing ParseError is understandable for reader, but would be weird for writer. If we really want to distinguish the exceptions from compression, we might come up with a different exception type. But so far I don't see a strong need here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117355243
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    +    }
    +    BufferedOutputStream::BackUp(outputSize - outputPosition);
    +    bufferSize = outputSize = outputPosition = 0;
    +    return BufferedOutputStream::flush();
    +  }
    +
    +  uint64_t CompressionStreamBase::getSize() const {
    +    return BufferedOutputStream::getSize() -
    +           static_cast<uint64_t>(outputSize - outputPosition);
    +  }
    +
    +  /**
    +   * Streaming compression base class
    +   */
    +  class CompressionStream: public CompressionStreamBase {
    +  public:
    +    CompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override;
    +    virtual std::string getName() const override = 0;
    +
    +  protected:
    +    // return total compressed size
    +    virtual uint64_t doStreamingCompression() = 0;
    +  };
    +
    +  CompressionStream::CompressionStream(OutputStream * outStream,
    +                                       int compressionLevel,
    +                                       uint64_t capacity,
    +                                       uint64_t blockSize,
    +                                       MemoryPool& pool) :
    +                                         CompressionStreamBase(outStream,
    +                                                               compressionLevel,
    +                                                               capacity,
    +                                                               blockSize,
    +                                                               pool) {
    +    // PASS
    +  }
    +
    +  bool CompressionStream::Next(void** data, int*size) {
    +    if (bufferSize != 0) {
    +      // adjust 3 bytes for the compression header
    +      if (outputPosition + 3 >= outputSize) {
    +        int newPosition = outputPosition + 3 - outputSize;
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    +            "Failed to get next output buffer from output stream.");
    +        }
    +        outputPosition = newPosition;
    +      } else {
    +        outputPosition += 3;
    +      }
    +
    +      uint64_t totalCompressedSize = doStreamingCompression();
    +
    +      char * header = outputBuffer + outputPosition - totalCompressedSize - 3;
    +      if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
    +        writeHeader(header, static_cast<size_t>(bufferSize), true);
    +        memcpy(
    +          header + 3,
    +          rawInputBuffer.data(),
    +          static_cast<size_t>(bufferSize));
    +
    +        int backup = static_cast<int>(totalCompressedSize) - bufferSize;
    +        BufferedOutputStream::BackUp(backup);
    +        outputPosition -= backup;
    +        outputSize -= backup;
    +      } else {
    +        writeHeader(header, totalCompressedSize, false);
    +      }
    +    }
    +
    +    *data = rawInputBuffer.data();
    +    *size = static_cast<int>(rawInputBuffer.size());
    +    bufferSize = *size;
    +
    +    return true;
    +  }
    +
    +  class ZlibCompressionStream: public CompressionStream {
    +  public:
    +    ZlibCompressionStream(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual std::string getName() const override;
    +
    +  protected:
    +    virtual uint64_t doStreamingCompression() override;
    +
    +  private:
    +    void init();
    +    z_stream strm;
    +  };
    +
    +  ZlibCompressionStream::ZlibCompressionStream(
    +                        OutputStream * outStream,
    +                        int compressionLevel,
    +                        uint64_t capacity,
    +                        uint64_t blockSize,
    +                        MemoryPool& pool)
    +                        : CompressionStream(outStream,
    +                                            compressionLevel,
    +                                            capacity,
    +                                            blockSize,
    +                                            pool) {
    +    init();
    +  }
    +
    +  uint64_t ZlibCompressionStream::doStreamingCompression() {
    +    if (deflateReset(&strm) != Z_OK) {
    +      throw std::logic_error("Failed to reset inflate.");
    +    }
    +
    +    strm.avail_in = static_cast<unsigned int>(bufferSize);
    +    strm.next_in = rawInputBuffer.data();
    +
    +    do {
    +      if (outputPosition >= outputSize) {
    +        if (!BufferedOutputStream::Next(
    +          reinterpret_cast<void **>(&outputBuffer),
    +          &outputSize)) {
    +          throw std::logic_error(
    --- End diff --
    
    use `ParseError` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117609935
  
    --- Diff: c++/src/Compression.hh ---
    @@ -20,9 +20,15 @@
     #define ORC_COMPRESSION_HH
     
     #include "io/InputStream.hh"
    +#include "io/OutputStream.hh"
     
     namespace orc {
     
    +  enum CompressionStrategy {
    +    CompressionStrategy_SPEED = 0,
    --- End diff --
    
    Can you add a `CompressionStrategy_DEFAULT` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117379212
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    ParseError is extended from std::runtime_error.
    Applications that integrate with the ORC library would want to distinguish exceptions thrown by the ORC library versus thrown by other libraries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117363862
  
    --- Diff: c++/src/Compression.cc ---
    @@ -636,6 +884,33 @@ DIAGNOSTIC_POP
         return static_cast<uint64_t>(result);
       }
     
    +  std::unique_ptr<BufferedOutputStream>
    +     createCompressor(
    +                      CompressionKind kind,
    +                      OutputStream * outStream,
    +                      CompressionStrategy strategy,
    +                      uint64_t bufferCapacity,
    +                      uint64_t blockSize,
    +                      MemoryPool& pool) {
    +    switch (static_cast<int64_t>(kind)) {
    +    case CompressionKind_NONE: {
    +      return std::unique_ptr<BufferedOutputStream>
    +        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
    +    }
    +    case CompressionKind_ZLIB: {
    +      int level = strategy == CompressionStrategy_SPEED ? 1 : 9;
    --- End diff --
    
    Will do. Thx.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc issue #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on the issue:

    https://github.com/apache/orc/pull/122
  
    Hi @majetideepak , what happens next after you approved? are you able to accept this pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc issue #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on the issue:

    https://github.com/apache/orc/pull/122
  
    +1 LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117377410
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    I think logic_error is also not apt for runtime failures. ParseError has been used throughout the read path.
    Moving forward, you will have to create an Exception class for the write path. How about we create a `WriteError` exception class similar to `ParseError`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by xndai <gi...@git.apache.org>.
Github user xndai commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117380313
  
    --- Diff: c++/src/Compression.cc ---
    @@ -33,6 +33,254 @@
     
     namespace orc {
     
    +  class CompressionStreamBase: public BufferedOutputStream {
    +  public:
    +    CompressionStreamBase(OutputStream * outStream,
    +                          int compressionLevel,
    +                          uint64_t capacity,
    +                          uint64_t blockSize,
    +                          MemoryPool& pool);
    +
    +    virtual bool Next(void** data, int*size) override = 0;
    +    virtual void BackUp(int count) override;
    +
    +    virtual std::string getName() const override = 0;
    +    virtual uint64_t flush() override;
    +
    +    virtual bool isCompressed() const override { return true; }
    +    virtual uint64_t getSize() const override;
    +
    +  protected:
    +    void writeHeader(char * buffer, size_t compressedSize, bool original) {
    +      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 0));
    +      buffer[1] = static_cast<char>(compressedSize >> 7);
    +      buffer[2] = static_cast<char>(compressedSize >> 15);
    +    }
    +
    +    // Buffer to hold uncompressed data until user calls Next()
    +    DataBuffer<unsigned char> rawInputBuffer;
    +
    +    // Compress level
    +    int level;
    +
    +    // Compressed data output buffer
    +    char * outputBuffer;
    +
    +    // Size for compressionBuffer
    +    int bufferSize;
    +
    +    // Compress output position
    +    int outputPosition;
    +
    +    // Compress output buffer size
    +    int outputSize;
    +  };
    +
    +  CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
    +                                               int compressionLevel,
    +                                               uint64_t capacity,
    +                                               uint64_t blockSize,
    +                                               MemoryPool& pool) :
    +                                                BufferedOutputStream(pool,
    +                                                                     outStream,
    +                                                                     capacity,
    +                                                                     blockSize),
    +                                                rawInputBuffer(pool, blockSize),
    +                                                level(compressionLevel),
    +                                                outputBuffer(nullptr),
    +                                                bufferSize(0),
    +                                                outputPosition(0),
    +                                                outputSize(0) {
    +    // PASS
    +  }
    +
    +  void CompressionStreamBase::BackUp(int count) {
    +    if (count > bufferSize) {
    +      throw std::logic_error("Can't backup that much!");
    +    }
    +    bufferSize -= count;
    +  }
    +
    +  uint64_t CompressionStreamBase::flush() {
    +    void * data;
    +    int size;
    +    if (!Next(&data, &size)) {
    +      throw std::logic_error("Failed to flush compression buffer.");
    --- End diff --
    
    Or we keep RuntimeError and LogicError both defined in orc namespace to differentiate different types of errors. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] orc pull request #122: ORC-192 Implement zlib compresion stream

Posted by majetideepak <gi...@git.apache.org>.
Github user majetideepak commented on a diff in the pull request:

    https://github.com/apache/orc/pull/122#discussion_r117357368
  
    --- Diff: c++/src/Compression.cc ---
    @@ -636,6 +884,33 @@ DIAGNOSTIC_POP
         return static_cast<uint64_t>(result);
       }
     
    +  std::unique_ptr<BufferedOutputStream>
    +     createCompressor(
    +                      CompressionKind kind,
    +                      OutputStream * outStream,
    +                      CompressionStrategy strategy,
    +                      uint64_t bufferCapacity,
    +                      uint64_t blockSize,
    +                      MemoryPool& pool) {
    +    switch (static_cast<int64_t>(kind)) {
    +    case CompressionKind_NONE: {
    +      return std::unique_ptr<BufferedOutputStream>
    +        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
    +    }
    +    case CompressionKind_ZLIB: {
    +      int level = strategy == CompressionStrategy_SPEED ? 1 : 9;
    --- End diff --
    
    Adding braces here improves readbility.
    `(strategy == CompressionStrategy_SPEED)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---