You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2016/11/11 00:44:43 UTC

[2/2] mesos git commit: Introduced a streaming gzip::Decompressor.

Introduced a streaming gzip::Decompressor.

This enables incremental decompression of compressed input.

Review: https://reviews.apache.org/r/53366


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc31ad0c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc31ad0c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc31ad0c

Branch: refs/heads/master
Commit: cc31ad0c51b296a1c580ba8ecde05e3cd4203120
Parents: 2256b23
Author: Benjamin Mahler <bm...@apache.org>
Authored: Tue Nov 1 19:20:35 2016 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Nov 10 16:42:48 2016 -0800

----------------------------------------------------------------------
 3rdparty/stout/include/stout/gzip.hpp | 135 +++++++++++++++++++----------
 3rdparty/stout/tests/gzip_tests.cpp   |  37 ++++++++
 2 files changed, 128 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cc31ad0c/3rdparty/stout/include/stout/gzip.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/gzip.hpp b/3rdparty/stout/include/stout/gzip.hpp
index e19ed2f..83010cc 100644
--- a/3rdparty/stout/include/stout/gzip.hpp
+++ b/3rdparty/stout/include/stout/gzip.hpp
@@ -25,7 +25,7 @@
 
 
 // Compression utilities.
-// TODO(bmahler): Provide streaming compression / decompression as well.
+// TODO(bmahler): Provide streaming compression as well.
 namespace gzip {
 
 namespace internal {
@@ -33,6 +33,7 @@ namespace internal {
 // We use a 16KB buffer with zlib compression / decompression.
 #define GZIP_BUFFER_SIZE 16384
 
+
 class GzipError : public Error
 {
 public:
@@ -82,6 +83,89 @@ private:
 } // namespace internal {
 
 
+// Provides the ability to incrementally decompress
+// a stream of compressed input data.
+class Decompressor
+{
+public:
+  Decompressor()
+    : _finished(false)
+  {
+    stream.zalloc = Z_NULL;
+    stream.zfree = Z_NULL;
+    stream.opaque = Z_NULL;
+    stream.next_in = Z_NULL;
+    stream.avail_in = 0;
+
+    int code = inflateInit2(
+        &stream,
+        MAX_WBITS + 16); // Zlib magic for gzip compression / decompression.
+
+    if (code != Z_OK) {
+      Error error = internal::GzipError("Failed to inflateInit2", stream, code);
+      ABORT(error.message);
+    }
+  }
+
+  ~Decompressor()
+  {
+    if (inflateEnd(&stream) != Z_OK) {
+      ABORT("Failed to inflateEnd");
+    }
+  }
+
+  // Returns the next decompressed chunk of data,
+  // or an Error if decompression fails.
+  Try<std::string> decompress(const std::string& compressed)
+  {
+    stream.next_in =
+      const_cast<Bytef*>(reinterpret_cast<const Bytef*>(compressed.data()));
+    stream.avail_in = compressed.length();
+
+    // Build up the decompressed result.
+    Bytef buffer[GZIP_BUFFER_SIZE];
+    std::string result;
+
+    while (stream.avail_in > 0) {
+      stream.next_out = buffer;
+      stream.avail_out = GZIP_BUFFER_SIZE;
+
+      int code = inflate(&stream, Z_SYNC_FLUSH);
+
+      _finished = code == Z_STREAM_END;
+
+      if (code != Z_OK && !_finished) {
+        return internal::GzipError("Failed to inflate", stream, code);
+      }
+
+      if (_finished && stream.avail_in > 0) {
+        return Error("Stream finished with data unconsumed");
+      }
+
+      // Consume output and reset the buffer.
+      result.append(
+          reinterpret_cast<char*>(buffer),
+          GZIP_BUFFER_SIZE - stream.avail_out);
+      stream.next_out = buffer;
+      stream.avail_out = GZIP_BUFFER_SIZE;
+    }
+
+    return result;
+  }
+
+  // Returns whether the decompression stream is finished.
+  // If set to false, more input is expected.
+  bool finished() const
+  {
+    return _finished;
+  }
+
+private:
+  z_stream_s stream;
+  bool _finished;
+};
+
+
 // Returns a gzip compressed version of the provided string.
 // The compression level should be within the range [-1, 9].
 // See zlib.h:
@@ -155,52 +239,15 @@ inline Try<std::string> compress(
 // Returns a gzip decompressed version of the provided string.
 inline Try<std::string> decompress(const std::string& compressed)
 {
-  z_stream_s stream;
-  stream.next_in =
-    const_cast<Bytef*>(reinterpret_cast<const Bytef*>(compressed.data()));
-  stream.avail_in = compressed.length();
-  stream.zalloc = Z_NULL;
-  stream.zfree = Z_NULL;
-  stream.opaque = Z_NULL;
-
-  int code = inflateInit2(
-      &stream,
-      MAX_WBITS + 16); // Zlib magic for gzip compression / decompression.
-
-  if (code != Z_OK) {
-    Error error = internal::GzipError("Failed to inflateInit2", stream, code);
-    ABORT(error.message);
-  }
-
-  // Build up the decompressed result.
-  Bytef buffer[GZIP_BUFFER_SIZE];
-  std::string result = "";
-  do {
-    stream.next_out = buffer;
-    stream.avail_out = GZIP_BUFFER_SIZE;
-    code = inflate(&stream, stream.avail_in > 0 ? Z_NO_FLUSH : Z_FINISH);
-
-    if (code != Z_OK && code != Z_STREAM_END) {
-      Error error = internal::GzipError("Failed to inflate", stream, code);
-      if (inflateEnd(&stream) != Z_OK) {
-        ABORT("Failed to inflateEnd");
-      }
-      return error;
-    }
-
-    // Consume output and reset the buffer.
-    result.append(
-        reinterpret_cast<char*>(buffer),
-        GZIP_BUFFER_SIZE - stream.avail_out);
-    stream.next_out = buffer;
-    stream.avail_out = GZIP_BUFFER_SIZE;
-  } while (code != Z_STREAM_END);
+  Decompressor decompressor;
+  Try<std::string> decompressed = decompressor.decompress(compressed);
 
-  if (inflateEnd(&stream) != Z_OK) {
-    ABORT("Failed to inflateEnd");
+  // Ensure that the decompression stream does not expect more input.
+  if (decompressed.isSome() && !decompressor.finished()) {
+    return Error("More input is expected");
   }
 
-  return result;
+  return decompressed;
 }
 
 } // namespace gzip {

http://git-wip-us.apache.org/repos/asf/mesos/blob/cc31ad0c/3rdparty/stout/tests/gzip_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/tests/gzip_tests.cpp b/3rdparty/stout/tests/gzip_tests.cpp
index 814fb99..05764a4 100644
--- a/3rdparty/stout/tests/gzip_tests.cpp
+++ b/3rdparty/stout/tests/gzip_tests.cpp
@@ -62,4 +62,41 @@ TEST(GzipTest, CompressDecompressString)
   ASSERT_SOME(decompressed);
   ASSERT_EQ(s, decompressed.get());
 }
+
+
+TEST(GzipTest, Decompressor)
+{
+  string s =
+    "Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do "
+    "eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad "
+    "minim veniam, quis nostrud exercitation ullamco laboris nisi ut "
+    "aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit "
+    "in voluptate velit esse cillum dolore eu fugiat nulla pariatur. "
+    "Excepteur sint occaecat cupidatat non proident, sunt in culpa qui "
+    "officia deserunt mollit anim id est laborum.";
+
+  Try<string> compressed = gzip::compress(s);
+  ASSERT_SOME(compressed);
+
+  gzip::Decompressor decompressor;
+
+  // Decompress 1 byte at a time.
+  string decompressed;
+  size_t i = 0;
+
+  while (i < compressed->size()) {
+    size_t chunkSize = 1;
+    string chunk = compressed->substr(i, chunkSize);
+
+    Try<string> decompressedChunk = decompressor.decompress(chunk);
+    ASSERT_SOME(decompressedChunk);
+    decompressed += decompressedChunk.get();
+
+    i += chunkSize;
+  }
+
+  EXPECT_TRUE(decompressor.finished());
+
+  ASSERT_EQ(s, decompressed);
+}
 #endif // HAVE_LIBZ