You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2016/11/11 00:44:43 UTC
[2/2] mesos git commit: Introduced a streaming gzip::Decompressor.
Introduced a streaming gzip::Decompressor.
This enables incremental decompression of compressed input.
Review: https://reviews.apache.org/r/53366
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc31ad0c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc31ad0c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc31ad0c
Branch: refs/heads/master
Commit: cc31ad0c51b296a1c580ba8ecde05e3cd4203120
Parents: 2256b23
Author: Benjamin Mahler <bm...@apache.org>
Authored: Tue Nov 1 19:20:35 2016 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Nov 10 16:42:48 2016 -0800
----------------------------------------------------------------------
3rdparty/stout/include/stout/gzip.hpp | 135 +++++++++++++++++++----------
3rdparty/stout/tests/gzip_tests.cpp | 37 ++++++++
2 files changed, 128 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc31ad0c/3rdparty/stout/include/stout/gzip.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/gzip.hpp b/3rdparty/stout/include/stout/gzip.hpp
index e19ed2f..83010cc 100644
--- a/3rdparty/stout/include/stout/gzip.hpp
+++ b/3rdparty/stout/include/stout/gzip.hpp
@@ -25,7 +25,7 @@
// Compression utilities.
-// TODO(bmahler): Provide streaming compression / decompression as well.
+// TODO(bmahler): Provide streaming compression as well.
namespace gzip {
namespace internal {
@@ -33,6 +33,7 @@ namespace internal {
// We use a 16KB buffer with zlib compression / decompression.
#define GZIP_BUFFER_SIZE 16384
+
class GzipError : public Error
{
public:
@@ -82,6 +83,89 @@ private:
} // namespace internal {
+// Provides the ability to incrementally decompress
+// a stream of compressed input data.
+class Decompressor
+{
+public:
+ Decompressor()
+ : _finished(false)
+ {
+ stream.zalloc = Z_NULL;
+ stream.zfree = Z_NULL;
+ stream.opaque = Z_NULL;
+ stream.next_in = Z_NULL;
+ stream.avail_in = 0;
+
+ int code = inflateInit2(
+ &stream,
+ MAX_WBITS + 16); // Zlib magic for gzip compression / decompression.
+
+ if (code != Z_OK) {
+ Error error = internal::GzipError("Failed to inflateInit2", stream, code);
+ ABORT(error.message);
+ }
+ }
+
+ ~Decompressor()
+ {
+ if (inflateEnd(&stream) != Z_OK) {
+ ABORT("Failed to inflateEnd");
+ }
+ }
+
+ // Returns the next decompressed chunk of data,
+ // or an Error if decompression fails.
+ Try<std::string> decompress(const std::string& compressed)
+ {
+ stream.next_in =
+ const_cast<Bytef*>(reinterpret_cast<const Bytef*>(compressed.data()));
+ stream.avail_in = compressed.length();
+
+ // Build up the decompressed result.
+ Bytef buffer[GZIP_BUFFER_SIZE];
+ std::string result;
+
+ while (stream.avail_in > 0) {
+ stream.next_out = buffer;
+ stream.avail_out = GZIP_BUFFER_SIZE;
+
+ int code = inflate(&stream, Z_SYNC_FLUSH);
+
+ _finished = code == Z_STREAM_END;
+
+ if (code != Z_OK && !_finished) {
+ return internal::GzipError("Failed to inflate", stream, code);
+ }
+
+ if (_finished && stream.avail_in > 0) {
+ return Error("Stream finished with data unconsumed");
+ }
+
+ // Consume output and reset the buffer.
+ result.append(
+ reinterpret_cast<char*>(buffer),
+ GZIP_BUFFER_SIZE - stream.avail_out);
+ stream.next_out = buffer;
+ stream.avail_out = GZIP_BUFFER_SIZE;
+ }
+
+ return result;
+ }
+
+ // Returns whether the decompression stream is finished.
+ // If set to false, more input is expected.
+ bool finished() const
+ {
+ return _finished;
+ }
+
+private:
+ z_stream_s stream;
+ bool _finished;
+};
+
+
// Returns a gzip compressed version of the provided string.
// The compression level should be within the range [-1, 9].
// See zlib.h:
@@ -155,52 +239,15 @@ inline Try<std::string> compress(
// Returns a gzip decompressed version of the provided string.
inline Try<std::string> decompress(const std::string& compressed)
{
- z_stream_s stream;
- stream.next_in =
- const_cast<Bytef*>(reinterpret_cast<const Bytef*>(compressed.data()));
- stream.avail_in = compressed.length();
- stream.zalloc = Z_NULL;
- stream.zfree = Z_NULL;
- stream.opaque = Z_NULL;
-
- int code = inflateInit2(
- &stream,
- MAX_WBITS + 16); // Zlib magic for gzip compression / decompression.
-
- if (code != Z_OK) {
- Error error = internal::GzipError("Failed to inflateInit2", stream, code);
- ABORT(error.message);
- }
-
- // Build up the decompressed result.
- Bytef buffer[GZIP_BUFFER_SIZE];
- std::string result = "";
- do {
- stream.next_out = buffer;
- stream.avail_out = GZIP_BUFFER_SIZE;
- code = inflate(&stream, stream.avail_in > 0 ? Z_NO_FLUSH : Z_FINISH);
-
- if (code != Z_OK && code != Z_STREAM_END) {
- Error error = internal::GzipError("Failed to inflate", stream, code);
- if (inflateEnd(&stream) != Z_OK) {
- ABORT("Failed to inflateEnd");
- }
- return error;
- }
-
- // Consume output and reset the buffer.
- result.append(
- reinterpret_cast<char*>(buffer),
- GZIP_BUFFER_SIZE - stream.avail_out);
- stream.next_out = buffer;
- stream.avail_out = GZIP_BUFFER_SIZE;
- } while (code != Z_STREAM_END);
+ Decompressor decompressor;
+ Try<std::string> decompressed = decompressor.decompress(compressed);
- if (inflateEnd(&stream) != Z_OK) {
- ABORT("Failed to inflateEnd");
+ // Ensure that the decompression stream does not expect more input.
+ if (decompressed.isSome() && !decompressor.finished()) {
+ return Error("More input is expected");
}
- return result;
+ return decompressed;
}
} // namespace gzip {
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc31ad0c/3rdparty/stout/tests/gzip_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/tests/gzip_tests.cpp b/3rdparty/stout/tests/gzip_tests.cpp
index 814fb99..05764a4 100644
--- a/3rdparty/stout/tests/gzip_tests.cpp
+++ b/3rdparty/stout/tests/gzip_tests.cpp
@@ -62,4 +62,41 @@ TEST(GzipTest, CompressDecompressString)
ASSERT_SOME(decompressed);
ASSERT_EQ(s, decompressed.get());
}
+
+
+TEST(GzipTest, Decompressor)
+{
+ string s =
+ "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+ "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad "
+ "minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+ "aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit "
+ "in voluptate velit esse cillum dolore eu fugiat nulla pariatur. "
+ "Excepteur sint occaecat cupidatat non proident, sunt in culpa qui "
+ "officia deserunt mollit anim id est laborum.";
+
+ Try<string> compressed = gzip::compress(s);
+ ASSERT_SOME(compressed);
+
+ gzip::Decompressor decompressor;
+
+ // Decompress 1 byte at a time.
+ string decompressed;
+ size_t i = 0;
+
+ while (i < compressed->size()) {
+ size_t chunkSize = 1;
+ string chunk = compressed->substr(i, chunkSize);
+
+ Try<string> decompressedChunk = decompressor.decompress(chunk);
+ ASSERT_SOME(decompressedChunk);
+ decompressed += decompressedChunk.get();
+
+ i += chunkSize;
+ }
+
+ EXPECT_TRUE(decompressor.finished());
+
+ ASSERT_EQ(s, decompressed);
+}
#endif // HAVE_LIBZ