You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/03/31 01:37:12 UTC

[impala] branch master updated: IMPALA-3766: optionally compress spilled data

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ebbe52b  IMPALA-3766: optionally compress spilled data
ebbe52b is described below

commit ebbe52b4bed944d3012e3679dc984827ce11d5a8
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Mar 16 15:21:10 2020 -0700

    IMPALA-3766: optionally compress spilled data
    
    Enabled via --disk_spill_compression_codec, which uses
    the same syntax as the compression_codec query option.
    Recommended codecs are LZ4 and ZSTD. ZSTD supports
    specifying a compression level.
    
    The compression is done in TmpFileMgr using a temporary
    buffer. Allocation of disk space is reworked slightly
    so that the allocation can happen after compression.
    
    The default power-of-two disk block sizes would lead
    to a lot of internal fragmentation, so a new strategy
    for free space management, similar to that used in
    the data cache, can be used with
    --disk_spill_punch_holes=true. TmpFileMgr will allocate
    a range of the actual compressed size and punch holes
    in the file for each range that is no longer needed.
    
    UncompressedWriteIoBytes is added to the buffer pool
    profiles, so that you can see what degree of compression
    is achieved. Typically I saw ratios of 2-3x for LZ4 and
    ZSTD (with LZ4 toward the lower end and ZSTD toward
    the higher end).
    
    Limitations:
    The management of the compression buffer memory could
    be improved. Ideally it would be integrated with the
    buffer pool and use the buffer pool allocator instead
    of being done "on the side". We would probably want to
    do this before making this the default, for resource
    management and performance reasons (doing a malloc()
    directly does not use the caching supported by the
    buffer pool).
    
    Testing:
    * Run buffer pool spilling tests with different combinations of
      the new options.
    * Extend existing TmpFileMgr tests for file space allocation to
      run with hole punching enabled.
    * Switch a couple of spilling tests to use the new option.
    * Add a metrics test to check for scratch leaks.
    * Enable the new options by default for end-to-end dockerized
      tests to get additional coverage.
    * Add a unit test where allocating compression memory fails,
      both on the read and write path.
    * Ran a single-node stress test on TPC-DS SF 1 and TPC-H SF 10
      The peak compression buffer usage was ~40MB.
    
    Perf:
    I ran this spilling query using an SSD as the scratch disk:
    
      set mem_limit=200m;
      select count(distinct l_partkey) from
      tpch30_parquet.lineitem;
    
    The time taken for the second run of each query was:
    No compression: 19.59s
    LZ4: 18.56s
    ZSTD: 20.59s
    
    Change-Id: I9c08ff9504097f0fee8c32316c5c150136abe659
    Reviewed-on: http://gerrit.cloudera.org:8080/15454
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
---
 be/src/runtime/bufferpool/buffer-pool-counters.h |   8 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc    |  87 ++++-
 be/src/runtime/bufferpool/buffer-pool.cc         |  20 +-
 be/src/runtime/test-env.cc                       |  10 +-
 be/src/runtime/test-env.h                        |   6 +-
 be/src/runtime/tmp-file-mgr-internal.h           |  13 +-
 be/src/runtime/tmp-file-mgr-test.cc              | 235 +++++++++++---
 be/src/runtime/tmp-file-mgr.cc                   | 386 ++++++++++++++++++-----
 be/src/runtime/tmp-file-mgr.h                    | 165 ++++++++--
 be/src/service/query-options.cc                  |  62 +---
 be/src/util/parse-util.cc                        |  64 ++++
 be/src/util/parse-util.h                         |  29 ++
 bin/jenkins/dockerized-impala-run-tests.sh       |   2 +
 tests/custom_cluster/test_scratch_disk.py        |   8 +-
 tests/verifiers/metric_verifier.py               |   4 +-
 15 files changed, 852 insertions(+), 247 deletions(-)

diff --git a/be/src/runtime/bufferpool/buffer-pool-counters.h b/be/src/runtime/bufferpool/buffer-pool-counters.h
index 952becc..43faab4 100644
--- a/be/src/runtime/bufferpool/buffer-pool-counters.h
+++ b/be/src/runtime/bufferpool/buffer-pool-counters.h
@@ -31,6 +31,12 @@ struct BufferPoolClientCounters {
   /// Total amount of time spent inside the system allocator (subset of 'alloc_time').
   RuntimeProfile::Counter* sys_alloc_time;
 
+  /// Total amount of time spent compressing and decompressing data when spiling.
+  RuntimeProfile::Counter* compression_time;
+
+  /// Total amount of time spent encrypting and decrypting data when spilling.
+  RuntimeProfile::Counter* encryption_time;
+
   /// Number of buffers allocated via BufferAllocator::AllocateBuffer().
   RuntimeProfile::Counter* cumulative_allocations;
 
@@ -52,7 +58,7 @@ struct BufferPoolClientCounters {
   /// Total number of write I/O operations issued.
   RuntimeProfile::Counter* write_io_ops;
 
-  /// Total bytes written to disk.
+  /// Total bytes written to disk. (May be compressed).
   RuntimeProfile::Counter* bytes_written;
 
   /// The peak total size of unpinned pages.
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 15b3b71..57b1046 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -55,10 +55,8 @@ using std::uniform_int_distribution;
 using std::uniform_real_distribution;
 
 DECLARE_bool(disk_spill_encryption);
-
-// Note: This is the default scratch dir created by impala.
-// FLAGS_scratch_dirs + TmpFileMgr::TMP_SUB_DIR_NAME.
-const string SCRATCH_DIR = "/tmp/impala-scratch";
+DECLARE_string(disk_spill_compression_codec);
+DECLARE_bool(disk_spill_punch_holes);
 
 // This suffix is appended to a tmp dir
 const string SCRATCH_SUFFIX = "/impala-scratch";
@@ -116,6 +114,13 @@ class BufferPoolTest : public ::testing::Test {
  protected:
   /// Reinitialize test_env_ to have multiple temporary directories.
   vector<string> InitMultipleTmpDirs(int num_dirs) {
+    return InitTmpFileMgr(
+        num_dirs, FLAGS_disk_spill_compression_codec, FLAGS_disk_spill_punch_holes);
+  }
+
+  /// Init a new tmp file manager with additional options.
+  vector<string> InitTmpFileMgr(
+      int num_dirs, const string& compression, bool punch_holes) {
     vector<string> tmp_dirs;
     for (int i = 0; i < num_dirs; ++i) {
       const string& dir = Substitute("/tmp/buffer-pool-test.$0", i);
@@ -127,7 +132,7 @@ class BufferPoolTest : public ::testing::Test {
     }
     test_env_.reset(new TestEnv);
     test_env_->DisableBufferPool();
-    test_env_->SetTmpFileMgrArgs(tmp_dirs, false);
+    test_env_->SetTmpFileMgrArgs(tmp_dirs, false, compression, punch_holes);
     EXPECT_OK(test_env_->Init());
     EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
     return tmp_dirs;
@@ -311,9 +316,9 @@ class BufferPoolTest : public ::testing::Test {
   void WaitForAllWrites(ClientHandle* client) { client->impl_->WaitForAllWrites(); }
 
   // Remove write permissions on scratch files. Return # of scratch files.
-  static int RemoveScratchPerms() {
+  static int RemoveScratchPerms(const string& scratch_dir) {
     int num_files = 0;
-    directory_iterator dir_it(SCRATCH_DIR);
+    directory_iterator dir_it(scratch_dir);
     for (; dir_it != directory_iterator(); ++dir_it) {
       ++num_files;
       EXPECT_EQ(0, chmod(dir_it->path().c_str(), 0));
@@ -377,7 +382,9 @@ class BufferPoolTest : public ::testing::Test {
   void TestEvictionPolicy(int64_t page_size);
   void TestCleanPageLimit(int max_clean_pages, bool randomize_core);
   void TestQueryTeardown(bool write_error);
-  void TestWriteError(int write_delay_ms);
+  void TestWriteError(int write_delay_ms, const string& compression, bool punch_holes);
+  void TestTmpFileAllocateError(const string& compression, bool punch_holes);
+  void TestWriteErrorBlacklist(const string& compression, bool punch_holes);
   void TestRandomInternalSingle(int64_t buffer_len, bool multiple_pins);
   void TestRandomInternalMulti(int num_threads, int64_t buffer_len, bool multiple_pins);
   static const int SINGLE_THREADED_TID = -1;
@@ -1536,7 +1543,9 @@ TEST_F(BufferPoolTest, QueryTeardownWriteError) {
 // Test that the buffer pool handles a write error correctly.  Delete the scratch
 // directory before an operation that would cause a write and test that subsequent API
 // calls return errors as expected.
-void BufferPoolTest::TestWriteError(int write_delay_ms) {
+void BufferPoolTest::TestWriteError(
+    int write_delay_ms, const string& compression, bool punch_holes) {
+  InitTmpFileMgr(1, compression, punch_holes);
   int MAX_NUM_BUFFERS = 2;
   int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
   BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
@@ -1555,7 +1564,7 @@ void BufferPoolTest::TestWriteError(int write_delay_ms) {
   // Repin the pages
   ASSERT_OK(PinAll(&pool, &client, &pages));
   // Remove permissions to the backing storage so that future writes will fail
-  ASSERT_GT(RemoveScratchPerms(), 0);
+  ASSERT_GT(RemoveScratchPerms(test_env_->tmp_file_mgr()->GetTmpDirPath(0)), 0);
   // Give the first write a chance to fail before the second write starts.
   const int INTERVAL_MS = 10;
   UnpinAll(&pool, &client, &pages, INTERVAL_MS);
@@ -1588,18 +1597,35 @@ void BufferPoolTest::TestWriteError(int write_delay_ms) {
 }
 
 TEST_F(BufferPoolTest, WriteError) {
-  TestWriteError(0);
+  TestWriteError(0, "", false);
+}
+
+TEST_F(BufferPoolTest, WriteErrorCompression) {
+  TestWriteError(0, "snappy", true);
 }
 
 // Regression test for IMPALA-4842 - inject a delay in the write to
 // reproduce the issue.
 TEST_F(BufferPoolTest, WriteErrorWriteDelay) {
-  TestWriteError(100);
+  TestWriteError(100, "", false);
+}
+
+TEST_F(BufferPoolTest, WriteErrorDelayCompression) {
+  TestWriteError(100, "gzip", true);
 }
 
 // Test error handling when temporary file space cannot be allocated to back an unpinned
 // page.
 TEST_F(BufferPoolTest, TmpFileAllocateError) {
+  TestTmpFileAllocateError("", false);
+}
+
+TEST_F(BufferPoolTest, TmpFileAllocateErrorCompression) {
+  TestTmpFileAllocateError("lz4", true);
+}
+
+void BufferPoolTest::TestTmpFileAllocateError(
+    const string& compression, bool punch_holes) {
   const int MAX_NUM_BUFFERS = 2;
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
   BufferPool pool(test_env_->metrics(), TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
@@ -1615,7 +1641,7 @@ TEST_F(BufferPoolTest, TmpFileAllocateError) {
   pool.Unpin(&client, pages.data());
   WaitForAllWrites(&client);
   // Remove permissions to the temporary files - subsequent operations will fail.
-  ASSERT_GT(RemoveScratchPerms(), 0);
+  ASSERT_GT(RemoveScratchPerms(test_env_->tmp_file_mgr()->GetTmpDirPath(0)), 0);
   // The write error will happen asynchronously.
   pool.Unpin(&client, &pages[1]);
 
@@ -1629,12 +1655,25 @@ TEST_F(BufferPoolTest, TmpFileAllocateError) {
   pool.DeregisterClient(&client);
 }
 
+TEST_F(BufferPoolTest, WriteErrorBlacklist) {
+  TestWriteErrorBlacklist("", false);
+}
+
+TEST_F(BufferPoolTest, WriteErrorBlacklistHolepunch) {
+  TestWriteErrorBlacklist("", true);
+}
+
+TEST_F(BufferPoolTest, WriteErrorBlacklistCompression) {
+  TestWriteErrorBlacklist("lz4", true);
+}
+
 // Test that scratch devices are blacklisted after a write error. The query that
 // encountered the write error should not allocate more pages on that device, but
 // existing pages on the device will remain in use and future queries will use the device.
-TEST_F(BufferPoolTest, WriteErrorBlacklist) {
+void BufferPoolTest::TestWriteErrorBlacklist(
+    const string& compression, bool punch_holes) {
   // Set up two file groups with two temporary dirs.
-  vector<string> tmp_dirs = InitMultipleTmpDirs(2);
+  vector<string> tmp_dirs = InitTmpFileMgr(2, compression, punch_holes);
   // Simulate two concurrent queries.
   const int TOTAL_QUERIES = 3;
   const int INITIAL_QUERIES = 2;
@@ -1942,6 +1981,24 @@ TEST_F(BufferPoolTest, Multi8Random) {
   TestRandomInternalMulti(8, 8 * 1024, false);
 }
 
+// Sanity test with hole punching and no compression.
+// This will be run with and without encryption because those flags are toggled for the
+// entire test suite.
+TEST_F(BufferPoolTest, RandomHolePunch) {
+  InitTmpFileMgr(2, "", false);
+  TestRandomInternalSingle(8 * 1024, true);
+  TestRandomInternalMulti(4, 8 * 1024, true);
+}
+
+// Sanity test with compression and hole punching.
+// This will be run with and without encryption because those flags are toggled for the
+// entire test suite.
+TEST_F(BufferPoolTest, RandomCompressionHolePunch) {
+  InitTmpFileMgr(2, "lz4", true);
+  TestRandomInternalSingle(8 * 1024, true);
+  TestRandomInternalMulti(4, 8 * 1024, true);
+}
+
 // Single-threaded execution of the TestRandomInternalImpl.
 void BufferPoolTest::TestRandomInternalSingle(
     int64_t min_buffer_len, bool multiple_pins) {
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index e568224..65af75d 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -421,6 +421,8 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileGroup* file_group,
       child_profile, parent_reservation, mem_tracker, reservation_limit, mem_limit_mode);
   counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime");
   counters_.sys_alloc_time = ADD_TIMER(child_profile, "SystemAllocTime");
+  counters_.compression_time = ADD_TIMER(child_profile, "CompressionTime");
+  counters_.encryption_time = ADD_TIMER(child_profile, "EncryptionTime");
   counters_.cumulative_allocations =
       ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);
   counters_.cumulative_bytes_alloced =
@@ -528,7 +530,8 @@ Status BufferPool::Client::StartMoveToPinned(ClientHandle* client, Page* page) {
     DCHECK(page->write_handle != NULL);
     // Don't need on-disk data.
     cl.unlock(); // Don't block progress for other threads operating on other pages.
-    return file_group_->RestoreData(move(page->write_handle), page->buffer.mem_range());
+    return file_group_->RestoreData(
+        move(page->write_handle), page->buffer.mem_range(), &counters_);
   }
   // If the page wasn't in the clean pages list, it must have been evicted.
   return StartMoveEvictedToPinned(&cl, client, page);
@@ -576,8 +579,8 @@ Status BufferPool::Client::FinishMoveEvictedToPinned(Page* page) {
   // Don't hold any locks while reading back the data. It is safe to modify the page's
   // buffer handle without holding any locks because no concurrent operations can modify
   // evicted pages.
-  RETURN_IF_ERROR(
-      file_group_->WaitForAsyncRead(page->write_handle.get(), page->buffer.mem_range()));
+  RETURN_IF_ERROR(file_group_->WaitForAsyncRead(
+      page->write_handle.get(), page->buffer.mem_range(), &counters_));
   file_group_->DestroyWriteHandle(move(page->write_handle));
   page->pin_in_flight = false;
   return Status::OK();
@@ -729,19 +732,18 @@ void BufferPool::Client::WriteDirtyPagesAsync(int64_t min_bytes_to_write) {
       lock_guard<SpinLock> pl(page->buffer_lock);
       DCHECK(file_group_ != NULL);
       DCHECK(page->buffer.is_open());
-      COUNTER_ADD(counters().bytes_written, page->len);
-      COUNTER_ADD(counters().write_io_ops, 1);
       Status status = file_group_->Write(page->buffer.mem_range(),
-          [this, page](const Status& write_status) {
-            WriteCompleteCallback(page, write_status);
-          },
-          &page->write_handle);
+          [this, page](
+              const Status& write_status) { WriteCompleteCallback(page, write_status); },
+          &page->write_handle, &counters_);
       // Exit early on error: there is no point in starting more writes because future
       /// operations for this client will fail regardless.
       if (!status.ok()) {
         write_status_.MergeStatus(status);
         return;
       }
+      COUNTER_ADD(counters().bytes_written, page->write_handle->on_disk_len());
+      COUNTER_ADD(counters().write_io_ops, 1);
     }
     // Now that the write is in flight, update all the state
     Page* tmp = dirty_unpinned_pages_.PopBack();
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 321529c..27747d9 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -62,8 +62,8 @@ Status TestEnv::Init() {
   RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init());
   exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
   if (have_tmp_file_mgr_args_) {
-    RETURN_IF_ERROR(
-        tmp_file_mgr()->InitCustom(tmp_dirs_, one_tmp_dir_per_device_, metrics()));
+    RETURN_IF_ERROR(tmp_file_mgr()->InitCustom(tmp_dirs_, one_tmp_dir_per_device_,
+        tmp_file_mgr_compression_, tmp_file_mgr_punch_holes_, metrics()));
   } else {
     RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
   }
@@ -88,11 +88,13 @@ Status TestEnv::Init() {
   return Status::OK();
 }
 
-void TestEnv::SetTmpFileMgrArgs(
-    const std::vector<std::string>& tmp_dirs, bool one_dir_per_device) {
+void TestEnv::SetTmpFileMgrArgs(const vector<string>& tmp_dirs, bool one_dir_per_device,
+    const string& compression, bool punch_holes) {
   have_tmp_file_mgr_args_ = true;
   tmp_dirs_ = tmp_dirs;
   one_tmp_dir_per_device_ = one_dir_per_device;
+  tmp_file_mgr_compression_ = compression;
+  tmp_file_mgr_punch_holes_ = punch_holes;
 }
 
 void TestEnv::SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity) {
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index ce5e354..108b62f 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -36,8 +36,8 @@ class TestEnv {
 
   /// Set custom configuration for TmpFileMgr. Only has effect if called before Init().
   /// If not called, the default configuration is used.
-  void SetTmpFileMgrArgs(
-      const std::vector<std::string>& tmp_dirs, bool one_dir_per_device);
+  void SetTmpFileMgrArgs(const std::vector<std::string>& tmp_dirs,
+      bool one_dir_per_device, const std::string& compression, bool punch_holes);
 
   /// Disables creation of a BufferPool instance as part of this TestEnv in Init().
   void DisableBufferPool() { enable_buffer_pool_ = false; }
@@ -85,6 +85,8 @@ class TestEnv {
   bool have_tmp_file_mgr_args_;
   std::vector<std::string> tmp_dirs_;
   bool one_tmp_dir_per_device_;
+  std::string tmp_file_mgr_compression_;
+  bool tmp_file_mgr_punch_holes_;
 
   /// Whether a buffer pool should be created in Init().
   bool enable_buffer_pool_ = true;
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index 9c94144..90d3026 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -20,6 +20,7 @@
 
 #include <string>
 
+#include "common/atomic.h"
 #include "runtime/tmp-file-mgr.h"
 
 namespace impala {
@@ -56,6 +57,9 @@ class TmpFile {
   /// Get the disk ID that should be used for IO mgr queueing.
   int AssignDiskQueue() const;
 
+  /// Try to punch a hole in the file of size 'len' at 'offset'.
+  Status PunchHole(int64_t offset, int64_t len);
+
   const std::string& path() const { return path_; }
 
   /// Caller must hold TmpFileMgr::FileGroup::lock_.
@@ -85,8 +89,13 @@ class TmpFile {
   /// The id of the disk on which the physical file lies.
   const int disk_id_;
 
-  /// Current bytes allocated in the file. Modified by AllocateSpace().
-  int64_t bytes_allocated_;
+  /// Total bytes of the file that have been given out by AllocateSpace(). Note that
+  /// these bytes may not be actually using space on the filesystem, either because the
+  /// data hasn't been written or a hole has been punched. Modified by AllocateSpace().
+  int64_t allocation_offset_ = 0;
+
+  /// Bytes reclaimed through hole punching.
+  AtomicInt64 bytes_reclaimed_{0};
 
   /// Set to true to indicate that we shouldn't allocate any more space in this file.
   /// Protected by TmpFileMgr::FileGroup::lock_.
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 7f217a6..57a409b 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -32,10 +32,11 @@
 #include "runtime/tmp-file-mgr.h"
 #include "service/fe-support.h"
 #include "testutil/gtest-util.h"
+#include "util/bit-util.h"
+#include "util/collection-metrics.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/filesystem-util.h"
-#include "util/collection-metrics.h"
 #include "util/metrics.h"
 
 #include "gen-cpp/Types_types.h"  // for TUniqueId
@@ -45,6 +46,9 @@
 using boost::filesystem::path;
 
 DECLARE_bool(disk_spill_encryption);
+DECLARE_int64(disk_spill_compression_buffer_limit_bytes);
+DECLARE_string(disk_spill_compression_codec);
+DECLARE_bool(disk_spill_punch_holes);
 #ifndef NDEBUG
 DECLARE_int32(stress_scratch_write_delay_ms);
 #endif
@@ -61,17 +65,19 @@ static const int64_t TERABYTE = 1024L * GIGABYTE;
 class TmpFileMgrTest : public ::testing::Test {
  public:
   virtual void SetUp() {
-    metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
-    profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test");
-    test_env_.reset(new TestEnv);
-    ASSERT_OK(test_env_->Init());
-    cb_counter_ = 0;
-
     // Reset query options that are modified by tests.
     FLAGS_disk_spill_encryption = false;
+    FLAGS_disk_spill_compression_codec = "";
+    FLAGS_disk_spill_punch_holes = false;
 #ifndef NDEBUG
     FLAGS_stress_scratch_write_delay_ms = 0;
 #endif
+
+    metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
+    profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test");
+    test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+    cb_counter_ = 0;
   }
 
   virtual void TearDown() {
@@ -90,7 +96,7 @@ class TmpFileMgrTest : public ::testing::Test {
     // the pre-existing metrics (TmpFileMgr assumes it's a singleton in product code).
     MetricGroup* metrics = obj_pool_.Add(new MetricGroup(""));
     TmpFileMgr* mgr = obj_pool_.Add(new TmpFileMgr());
-    EXPECT_OK(mgr->InitCustom(tmp_dirs_spec, false, metrics));
+    EXPECT_OK(mgr->InitCustom(tmp_dirs_spec, false, "", false, metrics));
     return mgr;
   }
 
@@ -178,7 +184,10 @@ class TmpFileMgrTest : public ::testing::Test {
   static int64_t BytesAllocated(TmpFileGroup* group) {
     int64_t bytes_allocated = 0;
     for (unique_ptr<TmpFile>& file : group->tmp_files_) {
-      bytes_allocated += file->bytes_allocated_;
+      int64_t allocated = file->allocation_offset_;
+      int64_t reclaimed = file->bytes_reclaimed_.Load();
+      EXPECT_GE(allocated, reclaimed);
+      bytes_allocated += allocated - reclaimed;
     }
     EXPECT_EQ(bytes_allocated, group->current_bytes_allocated_);
     return bytes_allocated;
@@ -205,9 +214,21 @@ class TmpFileMgrTest : public ::testing::Test {
     while (cb_counter_ < val) cb_cv_.Wait(lock);
   }
 
+  /// Implementation of TestScratchLimit.
+  void TestScratchLimit(bool punch_holes, int64_t alloc_size);
+
+  /// Implementation of TestScratchRangeRecycling.
+  void TestScratchRangeRecycling(bool punch_holes);
+
+  /// Implementation of TestDirectoryLimits.
+  void TestDirectoryLimits(bool punch_holes);
+
   /// Implementation of TestBlockVerification(), which is run with different environments.
   void TestBlockVerification();
 
+  /// Implementation of TestCompressBufferManagment
+  void TestCompressBufferManagement();
+
   ObjectPool obj_pool_;
   scoped_ptr<MetricGroup> metrics_;
   // Owned by 'obj_pool_'.
@@ -267,7 +288,7 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get()));
   TUniqueId id;
   TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
 
@@ -291,7 +312,7 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", false, metrics_.get()));
   TUniqueId id;
   TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
 
@@ -319,7 +340,7 @@ TEST_F(TmpFileMgrTest, TestReportError) {
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", false, metrics_.get()));
   TUniqueId id;
   TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
 
@@ -368,7 +389,7 @@ TEST_F(TmpFileMgrTest, TestAllocateNonWritable) {
   }
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", false, metrics_.get()));
   TUniqueId id;
   TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
 
@@ -391,16 +412,29 @@ TEST_F(TmpFileMgrTest, TestAllocateNonWritable) {
 
 // Test scratch limit is applied correctly to group of files.
 TEST_F(TmpFileMgrTest, TestScratchLimit) {
+  TestScratchLimit(false, 128);
+}
+
+// Test that the scratch limit logic behaves identically with hole punching.
+// The test doesn't recycle ranges so this shouldn't affect behaviour.
+TEST_F(TmpFileMgrTest, TestScratchLimitPunchHoles) {
+  // With hole punching, allocations are rounded up to the nearest 4kb block,
+  // so we need the allocation size to be at least this large for the test.
+  TestScratchLimit(true, TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES);
+}
+
+void TmpFileMgrTest::TestScratchLimit(bool punch_holes, int64_t alloc_size) {
+  // Size must bea power-of-two so that FileGroup allocates exactly this amount of
+  // scratch space.
+  ASSERT_TRUE(BitUtil::IsPowerOf2(alloc_size));
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", punch_holes, metrics_.get()));
 
-  const int64_t LIMIT = 128;
-  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
-  const int64_t ALLOC_SIZE = 64;
+  const int64_t limit = alloc_size * 2;
   TUniqueId id;
-  TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id, LIMIT);
+  TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id, limit);
 
   vector<TmpFile*> files;
   ASSERT_OK(CreateFiles(&file_group, &files));
@@ -412,12 +446,12 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) {
 
   // Alloc from file 1 should succeed.
   SetNextAllocationIndex(&file_group, 0);
-  ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset));
+  ASSERT_OK(GroupAllocateSpace(&file_group, alloc_size, &alloc_file, &offset));
   ASSERT_EQ(alloc_file, files[0]); // Should select files round-robin.
   ASSERT_EQ(0, offset);
 
   // Allocate up to the max.
-  ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset));
+  ASSERT_OK(GroupAllocateSpace(&file_group, alloc_size, &alloc_file, &offset));
   ASSERT_EQ(0, offset);
   ASSERT_EQ(alloc_file, files[1]);
 
@@ -428,24 +462,37 @@ TEST_F(TmpFileMgrTest, TestScratchLimit) {
   ASSERT_NE(string::npos, status.msg().msg().find(GetBackendString()));
 
   // Check HWM metrics
-  checkHWMMetrics(LIMIT, LIMIT);
+  checkHWMMetrics(limit, limit);
   file_group.Close();
-  checkHWMMetrics(0, LIMIT);
+  checkHWMMetrics(0, limit);
 }
 
 // Test that scratch file ranges of varying length are recycled as expected.
 TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
+  TestScratchRangeRecycling(false);
+}
+
+// Test that scratch file ranges are not counted as allocated when we punch holes.
+TEST_F(TmpFileMgrTest, TestScratchRangeHolePunching) {
+  TestScratchRangeRecycling(true);
+}
+
+void TmpFileMgrTest::TestScratchRangeRecycling(bool punch_holes) {
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", punch_holes, metrics_.get()));
   TUniqueId id;
 
   TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
   int64_t expected_scratch_bytes_allocated = 0;
+  // The max value of expected_scratch_bytes_allocated throughout this test.
+  int64_t max_scratch_bytes_allocated = 0;
   // Test some different allocation sizes.
   checkHWMMetrics(0, 0);
-  for (int alloc_size = 64; alloc_size <= 64 * 1024; alloc_size *= 2) {
+  // Hole punching only reclaims space in 4kb block sizes.
+  int min_alloc_size = punch_holes ? TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES : 64;
+  for (int alloc_size = min_alloc_size; alloc_size <= 64 * 1024; alloc_size *= 2) {
     // Generate some data.
     const int BLOCKS = 5;
     vector<vector<uint8_t>> data(BLOCKS);
@@ -479,14 +526,20 @@ TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
         EXPECT_EQ(0, memcmp(tmp.data(), data[j].data(), alloc_size));
         file_group.DestroyWriteHandle(move(handles[j]));
       }
-      // Check that the space is still in use - it should be recycled by the next
-      // iteration.
-      EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group));
-      checkHWMMetrics(expected_scratch_bytes_allocated, expected_scratch_bytes_allocated);
+      // With hole punching, the scratch space is not in use. Without hole punching it
+      // is in used, but will be reused by the next iteration.
+      int64_t expected_bytes_allocated =
+          punch_holes ? 0 : expected_scratch_bytes_allocated;
+      EXPECT_EQ(expected_bytes_allocated, BytesAllocated(&file_group));
+      checkHWMMetrics(expected_bytes_allocated, expected_scratch_bytes_allocated);
     }
+    /// No scratch should be in use for the next iteration if holes were punched.
+    max_scratch_bytes_allocated =
+        max(max_scratch_bytes_allocated, expected_scratch_bytes_allocated);
+    if (punch_holes) expected_scratch_bytes_allocated = 0;
   }
   file_group.Close();
-  checkHWMMetrics(0, expected_scratch_bytes_allocated);
+  checkHWMMetrics(0, max_scratch_bytes_allocated);
 }
 
 // Regression test for IMPALA-4748, where hitting the process memory limit caused
@@ -569,6 +622,11 @@ TEST_F(TmpFileMgrTest, TestBlockVerificationGcmDisabled) {
   TestBlockVerification();
 }
 
+TEST_F(TmpFileMgrTest, TestBlockVerificationCompression) {
+  FLAGS_disk_spill_compression_codec = "zstd";
+  TestBlockVerification();
+}
+
 void TmpFileMgrTest::TestBlockVerification() {
   FLAGS_disk_spill_encryption = true;
   TUniqueId id;
@@ -620,7 +678,7 @@ TEST_F(TmpFileMgrTest, TestHWMMetric) {
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, "", false, metrics_.get()));
 
   const int64_t LIMIT = 128;
   // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
@@ -672,13 +730,26 @@ TEST_F(TmpFileMgrTest, TestHWMMetric) {
 // enforced. Sets up several scratch directories, some with limits, and checks
 // that the allocations occur in the right directories.
 TEST_F(TmpFileMgrTest, TestDirectoryLimits) {
+  TestDirectoryLimits(false);
+}
+
+// Same, but with hole punching enabled.
+TEST_F(TmpFileMgrTest, TestDirectoryLimitsPunchHoles) {
+  TestDirectoryLimits(true);
+}
+
+void TmpFileMgrTest::TestDirectoryLimits(bool punch_holes) {
+  // Use an allocation size where FileGroup allocates exactly this amount of scratch
+  // space. The directory limits below are set relative to this size.
+  const int64_t ALLOC_SIZE = TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES;
   vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2",
       "/tmp/tmp-file-mgr-test.3"});
-  vector<string> tmp_dir_specs({"/tmp/tmp-file-mgr-test.1:512",
-      "/tmp/tmp-file-mgr-test.2:1k", "/tmp/tmp-file-mgr-test.3"});
+  vector<string> tmp_dir_specs({"/tmp/tmp-file-mgr-test.1:4k",
+      "/tmp/tmp-file-mgr-test.2:8k", "/tmp/tmp-file-mgr-test.3"});
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get()));
+  ASSERT_OK(
+      tmp_file_mgr.InitCustom(tmp_dir_specs, false, "", punch_holes, metrics_.get()));
 
   TmpFileGroup file_group_1(
       &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId());
@@ -696,8 +767,6 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimits) {
   IntGauge* dir3_usage = metrics_->FindMetricForTesting<IntGauge>(
       "tmp-file-mgr.scratch-space-bytes-used.dir-2");
 
-  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
-  const int64_t ALLOC_SIZE = 512;
   int64_t offset;
   TmpFile* alloc_file;
 
@@ -755,7 +824,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitsExhausted) {
   const int64_t DIR2_LIMIT = 1024L * 1024L;
   RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
-  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get()));
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, "", false, metrics_.get()));
 
   TmpFileGroup file_group_1(
       &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId());
@@ -857,4 +926,98 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
   auto& empty_paths = GetTmpDirs(CreateTmpFileMgr(","));
   EXPECT_EQ(2, empty_paths.size());
 }
+
+// Test compression buffer memory management for reads and writes.
+TEST_F(TmpFileMgrTest, TestCompressBufferManagement) {
+  FLAGS_disk_spill_encryption = false;
+  TestCompressBufferManagement();
+}
+
+TEST_F(TmpFileMgrTest, TestCompressBufferManagementEncrypted) {
+  FLAGS_disk_spill_encryption = true;
+  TestCompressBufferManagement();
+}
+
+void TmpFileMgrTest::TestCompressBufferManagement() {
+  // Data string should be long and redundant enough to be compressible.
+  const string DATA = "the quick brown fox jumped over the lazy dog"
+                      "the fast red fox leaped over the sleepy dog";
+  const string BIG_DATA = DATA + DATA;
+  string data = DATA;
+  string big_data = BIG_DATA;
+  FLAGS_disk_spill_compression_buffer_limit_bytes = 2 * data.size();
+  // Limit compression buffers to not quite enough for two compression buffers
+  // for 'data' (since compression buffers need to be slightly larger than
+  // uncompressed data).
+  TmpFileMgr tmp_file_mgr;
+  ASSERT_OK(tmp_file_mgr.InitCustom(
+      vector<string>{"/tmp/tmp-file-mgr-test.1"}, true, "lz4", true, metrics_.get()));
+  MemTracker* compressed_buffer_tracker = tmp_file_mgr.compressed_buffer_tracker();
+  TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, TUniqueId());
+  MemRange data_mem_range(reinterpret_cast<uint8_t*>(&data[0]), data.size());
+  MemRange big_data_mem_range(reinterpret_cast<uint8_t*>(&big_data[0]), big_data.size());
+
+  // Start a write in flight, which should encrypt the data and write it to disk.
+  unique_ptr<TmpWriteHandle> compressed_handle, uncompressed_handle;
+  WriteRange::WriteDoneCallback callback =
+      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
+  ASSERT_OK(file_group.Write(data_mem_range, callback, &compressed_handle));
+  EXPECT_TRUE(compressed_handle->is_compressed());
+  int mem_consumption_after_first_write = compressed_buffer_tracker->peak_consumption();
+  EXPECT_GT(mem_consumption_after_first_write, 0)
+      << "Compressed buffer memory should be consumed for in-flight writes";
+  WaitForWrite(compressed_handle.get());
+  EXPECT_EQ(0, compressed_buffer_tracker->consumption())
+      << "No memory should be consumed when no reads or writes in-flight";
+
+  // Spot-check that bytes written counters were updated correctly.
+  EXPECT_GT(file_group.bytes_written_counter_->value(), 0);
+  EXPECT_GT(file_group.uncompressed_bytes_written_counter_->value(),
+            file_group.bytes_written_counter_->value());
+
+  // This data range is larger than the memory limit and falls back to uncompressed
+  // writes.
+  ASSERT_OK(file_group.Write(big_data_mem_range, callback, &uncompressed_handle));
+  EXPECT_EQ(uncompressed_handle->data_len(), uncompressed_handle->on_disk_len());
+  EXPECT_FALSE(uncompressed_handle->is_compressed());
+  EXPECT_LE(compressed_buffer_tracker->consumption(), mem_consumption_after_first_write)
+      << "Second write should have fallen back to uncompressed writes";
+
+  WaitForWrite(uncompressed_handle.get());
+  WaitForCallbacks(2);
+  EXPECT_EQ(0, compressed_buffer_tracker->consumption())
+      << "No memory should be consumed when no reads or writes in-flight";
+
+  vector<uint8_t> tmp(data.size());
+  vector<uint8_t> big_tmp(big_data.size());
+  // Check behaviour when reading compressed range with available memory.
+  // Reading from disk needs to allocate a compression buffer.
+  EXPECT_OK(file_group.Read(compressed_handle.get(), MemRange(tmp.data(), tmp.size())));
+  EXPECT_EQ(0, memcmp(tmp.data(), DATA.data(), DATA.size()));
+
+  // Check behaviour when reading ranges with not enough available memory.
+  compressed_buffer_tracker->Consume(DATA.size() * 2);
+  Status status =
+      file_group.Read(compressed_handle.get(), MemRange(tmp.data(), tmp.size()));
+  EXPECT_FALSE(status.ok());
+  EXPECT_EQ(status.code(), TErrorCode::MEM_LIMIT_EXCEEDED);
+
+  // Reading uncompressed range becomes no extra memory
+  EXPECT_OK(file_group.Read(
+      uncompressed_handle.get(), MemRange(big_tmp.data(), big_tmp.size())));
+  EXPECT_EQ(0, memcmp(big_tmp.data(), BIG_DATA.data(), BIG_DATA.size()));
+
+  // Clear buffers to avoid false positives in tests.
+  memset(tmp.data(), 0, tmp.size());
+  memset(big_tmp.data(), 0, big_tmp.size());
+
+  // Restoring data should work ok with no memory.
+  EXPECT_OK(file_group.RestoreData(move(compressed_handle), data_mem_range));
+  EXPECT_EQ(0, memcmp(DATA.data(), data.data(), data.size()));
+  EXPECT_OK(file_group.RestoreData(move(uncompressed_handle), big_data_mem_range));
+  EXPECT_EQ(0, memcmp(BIG_DATA.data(), big_data.data(), big_data.size()));
+
+  compressed_buffer_tracker->Release(DATA.size() * 2);
+  file_group.Close();
+}
 } // namespace impala
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index d36d5c1..f2e2885 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -19,7 +19,9 @@
 
 #include <limits>
 #include <mutex>
+#include <linux/falloc.h>
 
+#include <zstd.h> // for ZSTD_CLEVEL_DEFAULT
 #include <boost/algorithm/string.hpp>
 #include <boost/filesystem.hpp>
 #include <boost/lexical_cast.hpp>
@@ -28,24 +30,47 @@
 #include <gutil/strings/join.h>
 #include <gutil/strings/substitute.h>
 
+#include "kudu/util/env.h"
+#include "runtime/bufferpool/buffer-pool-counters.h"
+#include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/request-context.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tmp-file-mgr-internal.h"
 #include "util/bit-util.h"
+#include "util/codec.h"
 #include "util/collection-metrics.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
+#include "util/kudu-status-util.h"
 #include "util/parse-util.h"
 #include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
+#include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
 
 DEFINE_bool(disk_spill_encryption, true,
     "Set this to encrypt and perform an integrity "
     "check on all data spilled to disk during a query");
+DEFINE_string(disk_spill_compression_codec, "",
+    "(Advanced) If set, data will be compressed using the specified compression codec "
+    "before spilling to disk. This can substantially reduce scratch disk usage, at the "
+    "cost of requiring more CPU and memory resources to compress the data. Uses the same "
+    "syntax as the COMPRESSION_CODEC query option, e.g. 'lz4', 'zstd', 'zstd:6'. If "
+    "this is set, then --disk_spill_punch_holes must be enabled.");
+DEFINE_int64(disk_spill_compression_buffer_limit_bytes, 512L * 1024L * 1024L,
+    "(Advanced) Limit on the total bytes of compression buffers that will be used for "
+    "spill-to-disk compression across all queries. If this limit is exceeded, some data "
+    "may be spilled to disk in uncompressed form.");
+DEFINE_bool(disk_spill_punch_holes, false,
+    "(Advanced) changes the free space management strategy for files created in "
+    "--scratch_dirs to punch holes in the file when space is unused. This can reduce "
+    "the amount of scratch space used by queries, particularly in conjunction with "
+    "disk spill compression. This option requires the filesystems of the directories "
+    "in --scratch_dirs to support hole punching.");
 DEFINE_string(scratch_dirs, "/tmp",
     "Writable scratch directories. "
     "This is a comma-separated list of directories. Each directory is "
@@ -66,10 +91,15 @@ using boost::filesystem::absolute;
 using boost::filesystem::path;
 using boost::uuids::random_generator;
 using namespace impala::io;
+using kudu::Env;
+using kudu::RWFile;
+using kudu::RWFileOptions;
 using namespace strings;
 
 namespace impala {
 
+constexpr int64_t TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES;
+
 const string TMP_SUB_DIR_NAME = "impala-scratch";
 const uint64_t AVAILABLE_SPACE_THRESHOLD_MB = 1024;
 
@@ -88,33 +118,53 @@ using DeviceId = TmpFileMgr::DeviceId;
 using TmpDir = TmpFileMgr::TmpDir;
 using WriteDoneCallback = TmpFileMgr::WriteDoneCallback;
 
-TmpFileMgr::TmpFileMgr()
-  : initialized_(false),
-    num_active_scratch_dirs_metric_(nullptr),
-    active_scratch_dirs_metric_(nullptr),
-    scratch_bytes_used_metric_(nullptr) {}
+TmpFileMgr::TmpFileMgr() {}
+
+TmpFileMgr::~TmpFileMgr() {}
 
 Status TmpFileMgr::Init(MetricGroup* metrics) {
-  return InitCustom(
-      FLAGS_scratch_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics);
+  return InitCustom(FLAGS_scratch_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device,
+      FLAGS_disk_spill_compression_codec, FLAGS_disk_spill_punch_holes, metrics);
 }
 
-Status TmpFileMgr::InitCustom(
-    const string& tmp_dirs_spec, bool one_dir_per_device, MetricGroup* metrics) {
+Status TmpFileMgr::InitCustom(const string& tmp_dirs_spec, bool one_dir_per_device,
+    const string& compression_codec, bool punch_holes, MetricGroup* metrics) {
   vector<string> all_tmp_dirs;
   // Empty string should be interpreted as no scratch
   if (!tmp_dirs_spec.empty()) {
     split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on);
   }
-  return InitCustom(all_tmp_dirs, one_dir_per_device, metrics);
+  return InitCustom(
+      all_tmp_dirs, one_dir_per_device, compression_codec, punch_holes, metrics);
 }
 
 Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
-    bool one_dir_per_device, MetricGroup* metrics) {
+    bool one_dir_per_device, const string& compression_codec, bool punch_holes,
+    MetricGroup* metrics) {
   DCHECK(!initialized_);
+  punch_holes_ = punch_holes;
   if (tmp_dir_specifiers.empty()) {
     LOG(WARNING) << "Running without spill to disk: no scratch directories provided.";
   }
+  if (!compression_codec.empty()) {
+    if (!punch_holes) {
+      return Status("--disk_spill_punch_holes must be true if disk spill compression "
+                    "is enabled");
+    }
+    Status codec_parse_status = ParseUtil::ParseCompressionCodec(
+        compression_codec, &compression_codec_, &compression_level_);
+    if (!codec_parse_status.ok()) {
+      return Status(
+          Substitute("Could not parse --disk_spill_compression_codec value '$0': $1",
+              compression_codec, codec_parse_status.GetDetail()));
+    }
+    if (compression_enabled()) {
+      compressed_buffer_tracker_.reset(
+          new MemTracker(FLAGS_disk_spill_compression_buffer_limit_bytes,
+              "Spill-to-disk temporary compression buffers",
+              ExecEnv::GetInstance()->process_mem_tracker()));
+    }
+  }
   vector<TmpDir> tmp_dirs;
   // Parse the directory specifiers. Don't return an error on parse errors, just log a
   // warning - we don't want to abort process startup because of misconfigured scratch,
@@ -186,6 +236,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
                      << scratch_subdir_path.string() << ": cannot use it for scratch. "
                      << "Error was: " << status.msg().msg();
       }
+      if (punch_holes_) {
+        // Make sure hole punching is supported for the directory.
+        RETURN_IF_ERROR(FileSystemUtil::CheckHolePunch(scratch_subdir_path.string()));
+      }
     }
   }
 
@@ -253,7 +307,6 @@ TmpFile::TmpFile(TmpFileGroup* file_group, DeviceId device_id, const string& pat
     path_(path),
     device_id_(device_id),
     disk_id_(DiskInfo::disk_id(path.c_str())),
-    bytes_allocated_(0),
     blacklisted_(false) {
   DCHECK(file_group != nullptr);
 }
@@ -266,8 +319,8 @@ bool TmpFile::AllocateSpace(int64_t num_bytes, int64_t* offset) {
     dir->bytes_used_metric->Increment(-num_bytes);
     return false;
   }
-  *offset = bytes_allocated_;
-  bytes_allocated_ += num_bytes;
+  *offset = allocation_offset_;
+  allocation_offset_ += num_bytes;
   return true;
 }
 
@@ -283,7 +336,10 @@ void TmpFile::Blacklist(const ErrorMsg& msg) {
 Status TmpFile::Remove() {
   // Remove the file if present (it may not be present if no writes completed).
   Status status = FileSystemUtil::RemovePaths({path_});
-  GetDir()->bytes_used_metric->Increment(-bytes_allocated_);
+  int64_t bytes_in_use = file_group_->tmp_file_mgr_->punch_holes() ?
+      allocation_offset_ - bytes_reclaimed_.Load() :
+      allocation_offset_;
+  GetDir()->bytes_used_metric->Increment(-bytes_in_use);
   return status;
 }
 
@@ -291,10 +347,26 @@ TmpFileMgr::TmpDir* TmpFile::GetDir() {
   return &file_group_->tmp_file_mgr_->tmp_dirs_[device_id_];
 }
 
+Status TmpFile::PunchHole(int64_t offset, int64_t len) {
+  DCHECK(file_group_->tmp_file_mgr_->punch_holes());
+  // Because of RAII, the file is automatically closed when this function returns.
+  RWFileOptions opts;
+  opts.mode = Env::CREATE_OR_OPEN;
+  unique_ptr<RWFile> file;
+  KUDU_RETURN_IF_ERROR(Env::Default()->NewRWFile(opts, path_, &file),
+      "Failed to open scratch file for hole punching");
+  KUDU_RETURN_IF_ERROR(
+      file->PunchHole(offset, len), "Failed to punch hole in scratch file");
+  bytes_reclaimed_.Add(len);
+  GetDir()->bytes_used_metric->Increment(-len);
+  VLOG(3) << "Punched hole in " << path_ << " " << offset << " " << len;
+  return Status::OK();
+}
+
 string TmpFile::DebugString() {
-  return Substitute("File $0 path '$1' device id $2 disk id $3 bytes allocated $4 "
-      "blacklisted $5", this, path_, device_id_, disk_id_, bytes_allocated_,
-      blacklisted_);
+  return Substitute(
+      "File $0 path '$1' device id $2 disk id $3 allocation offset $4 blacklisted $5",
+      this, path_, device_id_, disk_id_, allocation_offset_, blacklisted_);
 }
 
 TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
@@ -306,12 +378,17 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
     bytes_limit_(bytes_limit),
     write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)),
     bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", TUnit::BYTES)),
+    uncompressed_bytes_written_counter_(
+        ADD_COUNTER(profile, "UncompressedScratchBytesWritten", TUnit::BYTES)),
     read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)),
     bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)),
     scratch_space_bytes_used_counter_(
         ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)),
     disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")),
     encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")),
+    compression_timer_(tmp_file_mgr->compression_enabled() ?
+            ADD_TIMER(profile, "TotalCompressionTime") :
+            nullptr),
     current_bytes_allocated_(0),
     next_allocation_index_(0),
     free_ranges_(64) {
@@ -360,12 +437,27 @@ void TmpFileGroup::Close() {
   tmp_files_.clear();
 }
 
+// Rounds up to the smallest unit of allocation in a scratch file
+// that will fit 'bytes'.
+static int64_t RoundUpToScratchRangeSize(bool punch_holes, int64_t bytes) {
+  if (punch_holes) {
+    // Round up to a typical disk block size - 4KB that that hole punching can always
+    // free the backing storage for the entire range.
+    return BitUtil::RoundUpToPowerOf2(bytes, TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES);
+  } else {
+    // We recycle scratch ranges, which must be positive power-of-two sizes.
+    return max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(bytes));
+  }
+}
+
 Status TmpFileGroup::AllocateSpace(
     int64_t num_bytes, TmpFile** tmp_file, int64_t* file_offset) {
   lock_guard<SpinLock> lock(lock_);
-  int64_t scratch_range_bytes = max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(num_bytes));
+  int64_t scratch_range_bytes =
+      RoundUpToScratchRangeSize(tmp_file_mgr_->punch_holes(), num_bytes);
   int free_ranges_idx = BitUtil::Log2Ceiling64(scratch_range_bytes);
   if (!free_ranges_[free_ranges_idx].empty()) {
+    DCHECK(!tmp_file_mgr_->punch_holes()) << "Ranges not recycled when punching holes";
     *tmp_file = free_ranges_[free_ranges_idx].back().first;
     *file_offset = free_ranges_[free_ranges_idx].back().second;
     free_ranges_[free_ranges_idx].pop_back();
@@ -391,43 +483,57 @@ Status TmpFileGroup::AllocateSpace(
     next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
     *tmp_file = tmp_files_[idx].get();
     if ((*tmp_file)->is_blacklisted()) continue;
+
+    // Check the per-directory limit.
     if (!(*tmp_file)->AllocateSpace(scratch_range_bytes, file_offset)) {
       at_capacity_dirs.push_back(idx);
       continue;
     }
     scratch_space_bytes_used_counter_->Add(scratch_range_bytes);
     tmp_file_mgr_->scratch_bytes_used_metric_->Increment(scratch_range_bytes);
-    current_bytes_allocated_ += num_bytes;
+    current_bytes_allocated_ += scratch_range_bytes;
     return Status::OK();
   }
   return ScratchAllocationFailedStatus(at_capacity_dirs);
 }
 
 void TmpFileGroup::RecycleFileRange(unique_ptr<TmpWriteHandle> handle) {
-  int64_t scratch_range_bytes =
-      max<int64_t>(1L, BitUtil::RoundUpToPowerOfTwo(handle->len()));
-  int free_ranges_idx = BitUtil::Log2Ceiling64(scratch_range_bytes);
-  lock_guard<SpinLock> lock(lock_);
-  free_ranges_[free_ranges_idx].emplace_back(
-      handle->file_, handle->write_range_->offset());
+  TmpFile* file = handle->file_;
+  int64_t space_used_bytes =
+      RoundUpToScratchRangeSize(tmp_file_mgr_->punch_holes(), handle->on_disk_len());
+  if (tmp_file_mgr_->punch_holes()) {
+    Status status = file->PunchHole(handle->write_range_->offset(), space_used_bytes);
+    if (!status.ok()) {
+      // Proceed even in the hole punching fails - we will use extra disk space but
+      // functionally we can continue to spill.
+      LOG_EVERY_N(WARNING, 100) << "Failed to punch hole in scratch file, couldn't "
+                                << "reclaim space: " << status.GetDetail();
+      return;
+    }
+    scratch_space_bytes_used_counter_->Add(-space_used_bytes);
+    tmp_file_mgr_->scratch_bytes_used_metric_->Increment(-space_used_bytes);
+    {
+      lock_guard<SpinLock> lock(lock_);
+      current_bytes_allocated_ -= space_used_bytes;
+    }
+  } else {
+    int free_ranges_idx = BitUtil::Log2Ceiling64(space_used_bytes);
+    lock_guard<SpinLock> lock(lock_);
+    free_ranges_[free_ranges_idx].emplace_back(file, handle->write_range_->offset());
+  }
 }
 
 Status TmpFileGroup::Write(MemRange buffer, WriteDoneCallback cb,
-    unique_ptr<TmpWriteHandle>* handle) {
+    unique_ptr<TmpWriteHandle>* handle, const BufferPoolClientCounters* counters) {
   DCHECK_GE(buffer.len(), 0);
 
-  TmpFile* tmp_file;
-  int64_t file_offset;
-  RETURN_IF_ERROR(AllocateSpace(buffer.len(), &tmp_file, &file_offset));
-
-  unique_ptr<TmpWriteHandle> tmp_handle(new TmpWriteHandle(encryption_timer_, cb));
+  unique_ptr<TmpWriteHandle> tmp_handle(new TmpWriteHandle(this, cb));
   TmpWriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda.
   WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
-      const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); };
-  RETURN_IF_ERROR(
-      tmp_handle->Write(io_ctx_.get(), tmp_file, file_offset, buffer, callback));
-  write_counter_->Add(1);
-  bytes_written_counter_->Add(buffer.len());
+                                               const Status& write_status) {
+    WriteComplete(tmp_handle_ptr, write_status);
+  };
+  RETURN_IF_ERROR(tmp_handle->Write(io_ctx_.get(), buffer, callback, counters));
   *handle = move(tmp_handle);
   return Status::OK();
 }
@@ -440,52 +546,91 @@ Status TmpFileGroup::Read(TmpWriteHandle* handle, MemRange buffer) {
 Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) {
   DCHECK(handle->write_range_ != nullptr);
   DCHECK(!handle->is_cancelled_);
-  DCHECK_EQ(buffer.len(), handle->len());
+  DCHECK_EQ(buffer.len(), handle->data_len());
   Status status;
-
+  VLOG(3) << "ReadAsync " << handle->TmpFilePath() << " "
+          << handle->write_range_->offset() << " " << handle->on_disk_len();
   // Don't grab 'write_state_lock_' in this method - it is not necessary because we
   // don't touch any members that it protects and could block other threads for the
   // duration of the synchronous read.
   DCHECK(!handle->write_in_flight_);
   DCHECK(handle->read_range_ == nullptr);
   DCHECK(handle->write_range_ != nullptr);
+
+  MemRange read_buffer = buffer;
+  if (handle->is_compressed()) {
+    int64_t compressed_len = handle->compressed_len_;
+    if (!handle->compressed_.TryAllocate(compressed_len)) {
+      return tmp_file_mgr_->compressed_buffer_tracker()->MemLimitExceeded(
+          nullptr, "Failed to decompress spilled data", compressed_len);
+    }
+    DCHECK_EQ(compressed_len, handle->write_range_->len());
+    read_buffer = MemRange(handle->compressed_.buffer(), compressed_len);
+  }
+
   // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
   // since the write is not in flight.
   handle->read_range_ = scan_range_pool_.Add(new ScanRange);
   handle->read_range_->Reset(nullptr, handle->write_range_->file(),
       handle->write_range_->len(), handle->write_range_->offset(),
       handle->write_range_->disk_id(), false, false, ScanRange::INVALID_MTIME,
-      BufferOpts::ReadInto(buffer.data(), buffer.len(), BufferOpts::NO_CACHING));
+      BufferOpts::ReadInto(
+          read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING));
   read_counter_->Add(1);
-  bytes_read_counter_->Add(buffer.len());
+  bytes_read_counter_->Add(read_buffer.len());
   bool needs_buffers;
   RETURN_IF_ERROR(io_ctx_->StartScanRange(handle->read_range_, &needs_buffers));
   DCHECK(!needs_buffers) << "Already provided a buffer";
   return Status::OK();
 }
 
-Status TmpFileGroup::WaitForAsyncRead(TmpWriteHandle* handle, MemRange buffer) {
+Status TmpFileGroup::WaitForAsyncRead(
+    TmpWriteHandle* handle, MemRange buffer, const BufferPoolClientCounters* counters) {
   DCHECK(handle->read_range_ != nullptr);
   // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
   // since the write is not in flight.
   SCOPED_TIMER(disk_read_timer_);
+  MemRange read_buffer = handle->is_compressed() ?
+      MemRange{handle->compressed_.buffer(), handle->compressed_.Size()} :
+      buffer;
+  DCHECK(read_buffer.data() != nullptr);
   unique_ptr<BufferDescriptor> io_mgr_buffer;
   Status status = handle->read_range_->GetNext(&io_mgr_buffer);
   if (!status.ok()) goto exit;
   DCHECK(io_mgr_buffer != NULL);
   DCHECK(io_mgr_buffer->eosr());
-  DCHECK_LE(io_mgr_buffer->len(), buffer.len());
-  if (io_mgr_buffer->len() < buffer.len()) {
+  DCHECK_LE(io_mgr_buffer->len(), read_buffer.len());
+  if (io_mgr_buffer->len() < read_buffer.len()) {
     // The read was truncated - this is an error.
-    status = Status(TErrorCode::SCRATCH_READ_TRUNCATED, buffer.len(),
+    status = Status(TErrorCode::SCRATCH_READ_TRUNCATED, read_buffer.len(),
         handle->write_range_->file(), GetBackendString(), handle->write_range_->offset(),
         io_mgr_buffer->len());
     goto exit;
   }
-  DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data());
+  DCHECK_EQ(io_mgr_buffer->buffer(),
+      handle->is_compressed() ? handle->compressed_.buffer() : buffer.data());
 
+  // Decrypt and decompress in the reverse order that we compressed then encrypted the
+  // data originally.
   if (FLAGS_disk_spill_encryption) {
-    status = handle->CheckHashAndDecrypt(buffer);
+    status = handle->CheckHashAndDecrypt(read_buffer, counters);
+    if (!status.ok()) goto exit;
+  }
+
+  if (handle->is_compressed()) {
+    SCOPED_TIMER2(
+        compression_timer_, counters == nullptr ? nullptr : counters->compression_time);
+    scoped_ptr<Codec> decompressor;
+    status = Codec::CreateDecompressor(
+        nullptr, false, tmp_file_mgr_->compression_codec(), &decompressor);
+    if (status.ok()) {
+      int64_t decompressed_len = buffer.len();
+      uint8_t* decompressed_buffer = buffer.data();
+      status = decompressor->ProcessBlock(true, read_buffer.len(), read_buffer.data(),
+          &decompressed_len, &decompressed_buffer);
+    }
+    // Free the compressed data regardless of whether the read was successful.
+    handle->FreeCompressedBuffer();
     if (!status.ok()) goto exit;
   }
 exit:
@@ -495,17 +640,23 @@ exit:
   return status;
 }
 
-Status TmpFileGroup::RestoreData(
-    unique_ptr<TmpWriteHandle> handle, MemRange buffer) {
-  DCHECK_EQ(handle->write_range_->data(), buffer.data());
-  DCHECK_EQ(handle->len(), buffer.len());
+Status TmpFileGroup::RestoreData(unique_ptr<TmpWriteHandle> handle, MemRange buffer,
+    const BufferPoolClientCounters* counters) {
+  DCHECK_EQ(handle->data_len(), buffer.len());
+  if (!handle->is_compressed()) DCHECK_EQ(handle->write_range_->data(), buffer.data());
   DCHECK(!handle->write_in_flight_);
   DCHECK(handle->read_range_ == nullptr);
-  // Decrypt after the write is finished, so that we don't accidentally write decrypted
-  // data to disk.
+
+  VLOG(3) << "Restore " << handle->TmpFilePath() << " " << handle->write_range_->offset()
+          << " " << handle->data_len();
   Status status;
-  if (FLAGS_disk_spill_encryption) {
-    status = handle->CheckHashAndDecrypt(buffer);
+  if (handle->is_compressed()) {
+    // 'buffer' already contains the data needed, because the compressed data was written
+    // to 'compressed_' and (optionally) encrypted over there.
+  } else if (FLAGS_disk_spill_encryption) {
+    // Decrypt after the write is finished, so that we don't accidentally write decrypted
+    // data to disk.
+    status = handle->CheckHashAndDecrypt(buffer, counters);
   }
   RecycleFileRange(move(handle));
   return status;
@@ -554,7 +705,7 @@ Status TmpFileGroup::RecoverWriteError(
   // Discard the scratch file range - we will not reuse ranges from a bad file.
   // Choose another file to try. Blacklisting ensures we don't retry the same file.
   // If this fails, the status will include all the errors in 'scratch_errors_'.
-  RETURN_IF_ERROR(AllocateSpace(handle->len(), &tmp_file, &file_offset));
+  RETURN_IF_ERROR(AllocateSpace(handle->on_disk_len(), &tmp_file, &file_offset));
   return handle->RetryWrite(io_ctx_.get(), tmp_file, file_offset);
 }
 
@@ -586,6 +737,7 @@ string TmpFileGroup::DebugString() {
      << " current bytes allocated " << current_bytes_allocated_
      << " next allocation index " << next_allocation_index_ << " writes "
      << write_counter_->value() << " bytes written " << bytes_written_counter_->value()
+     << " uncompressed bytes written " << uncompressed_bytes_written_counter_->value()
      << " reads " << read_counter_->value() << " bytes read "
      << bytes_read_counter_->value() << " scratch bytes used "
      << scratch_space_bytes_used_counter_ << " dist read timer "
@@ -599,17 +751,15 @@ string TmpFileGroup::DebugString() {
 }
 
 TmpWriteHandle::TmpWriteHandle(
-    RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb)
-  : cb_(cb),
-    encryption_timer_(encryption_timer),
-    file_(nullptr),
-    read_range_(nullptr),
-    is_cancelled_(false),
-    write_in_flight_(false) {}
+    TmpFileGroup* const parent, WriteRange::WriteDoneCallback cb)
+  : parent_(parent),
+    cb_(cb),
+    compressed_(parent_->tmp_file_mgr_->compressed_buffer_tracker()) {}
 
 TmpWriteHandle::~TmpWriteHandle() {
   DCHECK(!write_in_flight_);
   DCHECK(read_range_ == nullptr);
+  DCHECK(compressed_.buffer() == nullptr);
 }
 
 string TmpWriteHandle::TmpFilePath() const {
@@ -617,23 +767,42 @@ string TmpWriteHandle::TmpFilePath() const {
   return file_->path();
 }
 
-int64_t TmpWriteHandle::len() const {
+int64_t TmpWriteHandle::on_disk_len() const {
   return write_range_->len();
 }
 
-Status TmpWriteHandle::Write(RequestContext* io_ctx,
-    TmpFile* file, int64_t offset, MemRange buffer,
-    WriteRange::WriteDoneCallback callback) {
+Status TmpWriteHandle::Write(RequestContext* io_ctx, MemRange buffer,
+    WriteRange::WriteDoneCallback callback, const BufferPoolClientCounters* counters) {
   DCHECK(!write_in_flight_);
+  MemRange buffer_to_write = buffer;
+  if (parent_->tmp_file_mgr_->compression_enabled() && TryCompress(buffer, counters)) {
+    buffer_to_write = MemRange(compressed_.buffer(), compressed_len_);
+  }
+  // Ensure that the compressed buffer is freed on all the code paths where we did not
+  // start the write successfully.
+  bool write_started = false;
+  const auto free_compressed = MakeScopeExitTrigger([this, &write_started]() {
+      if (!write_started) FreeCompressedBuffer();
+  });
+
+  // Allocate space after doing compression, to avoid overallocating space.
+  TmpFile* tmp_file;
+  int64_t file_offset;
+  RETURN_IF_ERROR(parent_->AllocateSpace(buffer_to_write.len(), &tmp_file, &file_offset));
 
-  if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer));
+  if (FLAGS_disk_spill_encryption) {
+    RETURN_IF_ERROR(EncryptAndHash(buffer_to_write, counters));
+  }
 
   // Set all member variables before calling AddWriteRange(): after it succeeds,
   // WriteComplete() may be called concurrently with the remainder of this function.
-  file_ = file;
-  write_range_.reset(
-      new WriteRange(file->path(), offset, file->AssignDiskQueue(), callback));
-  write_range_->SetData(buffer.data(), buffer.len());
+  data_len_ = buffer.len();
+  file_ = tmp_file;
+  write_range_.reset(new WriteRange(
+      tmp_file->path(), file_offset, tmp_file->AssignDiskQueue(), callback));
+  write_range_->SetData(buffer_to_write.data(), buffer_to_write.len());
+  VLOG(3) << "Write " << tmp_file->path() << " " << file_offset << " "
+          << buffer_to_write.len();
   write_in_flight_ = true;
   Status status = io_ctx->AddWriteRange(write_range_.get());
   if (!status.ok()) {
@@ -645,11 +814,50 @@ Status TmpWriteHandle::Write(RequestContext* io_ctx,
     is_cancelled_ = true;
     return status;
   }
+  write_started = true;
+  parent_->write_counter_->Add(1);
+  parent_->uncompressed_bytes_written_counter_->Add(buffer.len());
+  parent_->bytes_written_counter_->Add(buffer_to_write.len());
   return Status::OK();
 }
 
-Status TmpWriteHandle::RetryWrite(
-    RequestContext* io_ctx, TmpFile* file, int64_t offset) {
+bool TmpWriteHandle::TryCompress(
+    MemRange buffer, const BufferPoolClientCounters* counters) {
+  DCHECK(parent_->tmp_file_mgr_->compression_enabled());
+  SCOPED_TIMER2(parent_->compression_timer_,
+      counters == nullptr ? nullptr : counters->compression_time);
+  DCHECK_LT(compressed_len_, 0);
+  DCHECK(compressed_.buffer() == nullptr);
+  scoped_ptr<Codec> compressor;
+  Status status = Codec::CreateCompressor(nullptr, false,
+      Codec::CodecInfo(parent_->tmp_file_mgr_->compression_codec(),
+          parent_->tmp_file_mgr_->compression_level()),
+      &compressor);
+  if (!status.ok()) {
+    LOG(WARNING) << "Failed to compress, couldn't create compressor: "
+                 << status.GetDetail();
+    return false;
+  }
+  int64_t compressed_buffer_len = compressor->MaxOutputLen(buffer.len());
+  if (!compressed_.TryAllocate(compressed_buffer_len)) {
+    LOG_EVERY_N(INFO, 100) << "Failed to compress: couldn't allocate "
+                           << PrettyPrinter::PrintBytes(compressed_buffer_len);
+    return false;
+  }
+  uint8_t* compressed_buffer = compressed_.buffer();
+  int64_t compressed_len = compressed_buffer_len;
+  status = compressor->ProcessBlock(
+      true, buffer.len(), buffer.data(), &compressed_len, &compressed_buffer);
+  if (!status.ok()) {
+    compressed_.Release();
+    return false;
+  }
+  compressed_len_ = compressed_len;
+  VLOG(3) << "Buffer size: " << buffer.len() << " compressed size: " << compressed_len;
+  return true;
+}
+
+Status TmpWriteHandle::RetryWrite(RequestContext* io_ctx, TmpFile* file, int64_t offset) {
   DCHECK(write_in_flight_);
   file_ = file;
   write_range_->SetRange(file->path(), offset, file->AssignDiskQueue());
@@ -672,6 +880,11 @@ void TmpWriteHandle::WriteComplete(const Status& write_status) {
     // 'write_state_lock_', 'this' may be destroyed.
     cb = move(cb_);
 
+    if (is_compressed()) {
+      DCHECK(compressed_.buffer() != nullptr);
+      FreeCompressedBuffer();
+    }
+
     // Notify before releasing the lock - after the lock is released 'this' may be
     // destroyed.
     write_complete_cv_.NotifyAll();
@@ -695,6 +908,7 @@ void TmpWriteHandle::CancelRead() {
   if (read_range_ != nullptr) {
     read_range_->Cancel(Status::CancelledInternal("TmpFileMgr read"));
     read_range_ = nullptr;
+    FreeCompressedBuffer();
   }
 }
 
@@ -703,9 +917,11 @@ void TmpWriteHandle::WaitForWrite() {
   while (write_in_flight_) write_complete_cv_.Wait(lock);
 }
 
-Status TmpWriteHandle::EncryptAndHash(MemRange buffer) {
+Status TmpWriteHandle::EncryptAndHash(
+    MemRange buffer, const BufferPoolClientCounters* counters) {
   DCHECK(FLAGS_disk_spill_encryption);
-  SCOPED_TIMER(encryption_timer_);
+  SCOPED_TIMER2(parent_->encryption_timer_,
+      counters == nullptr ? nullptr : counters->encryption_time);
   // Since we're using GCM/CTR/CFB mode, we must take care not to reuse a
   // key/IV pair. Regenerate a new key and IV for every data buffer we write.
   key_.InitializeRandom();
@@ -717,10 +933,12 @@ Status TmpWriteHandle::EncryptAndHash(MemRange buffer) {
   return Status::OK();
 }
 
-Status TmpWriteHandle::CheckHashAndDecrypt(MemRange buffer) {
+Status TmpWriteHandle::CheckHashAndDecrypt(
+    MemRange buffer, const BufferPoolClientCounters* counters) {
   DCHECK(FLAGS_disk_spill_encryption);
   DCHECK(write_range_ != nullptr);
-  SCOPED_TIMER(encryption_timer_);
+  SCOPED_TIMER2(parent_->encryption_timer_,
+      counters == nullptr ? nullptr : counters->encryption_time);
 
   // GCM mode will verify the integrity by itself
   if (!key_.IsGcmMode()) {
@@ -741,15 +959,21 @@ Status TmpWriteHandle::CheckHashAndDecrypt(MemRange buffer) {
   return Status::OK();
 }
 
+void TmpWriteHandle::FreeCompressedBuffer() {
+  if (compressed_.buffer() == nullptr) return;
+  DCHECK(is_compressed());
+  compressed_.Release();
+}
+
 string TmpWriteHandle::DebugString() {
   unique_lock<mutex> lock(write_state_lock_);
   stringstream ss;
   ss << "Write handle " << this << " file '" << file_->path() << "'"
      << " is cancelled " << is_cancelled_ << " write in flight " << write_in_flight_;
   if (write_range_ != NULL) {
-    ss << " data " << write_range_->data() << " len " << write_range_->len()
-       << " file offset " << write_range_->offset()
-       << " disk id " << write_range_->disk_id();
+    ss << " data " << write_range_->data() << " disk range len " << write_range_->len()
+       << " file offset " << write_range_->offset() << " disk id "
+       << write_range_->disk_id();
   }
   return ss.str();
 }
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 6e72304..3ba3e21 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -23,10 +23,13 @@
 
 #include <mutex>
 #include <boost/scoped_ptr.hpp>
+#include <gtest/gtest_prod.h>
 
 #include "common/object-pool.h"
 #include "common/status.h"
+#include "gen-cpp/CatalogObjects_types.h" // for THdfsCompression
 #include "gen-cpp/Types_types.h" // for TUniqueId
+#include "runtime/scoped-buffer.h"
 #include "util/condition-variable.h"
 #include "util/mem-range.h"
 #include "util/metrics-fwd.h"
@@ -41,6 +44,8 @@ namespace io {
   class ScanRange;
   class WriteRange;
 }
+struct BufferPoolClientCounters;
+class MemTracker;
 class TmpFile;
 class TmpFileGroup;
 class TmpWriteHandle;
@@ -109,6 +114,8 @@ class TmpFileMgr {
 
   TmpFileMgr();
 
+  ~TmpFileMgr();
+
   /// Creates the configured tmp directories. If multiple directories are specified per
   /// disk, only one is created and used. Must be called after DiskInfo::Init().
   Status Init(MetricGroup* metrics) WARN_UNUSED_RESULT;
@@ -119,9 +126,11 @@ class TmpFileMgr {
   /// use the command-line syntax, i.e. <path>[:<limit>]. The first variant takes
   /// a comma-separated list, the second takes a vector.
   Status InitCustom(const std::string& tmp_dirs_spec, bool one_dir_per_device,
+      const std::string& compression_codec, bool punch_holes,
       MetricGroup* metrics) WARN_UNUSED_RESULT;
   Status InitCustom(const std::vector<std::string>& tmp_dir_specifiers,
-      bool one_dir_per_device, MetricGroup* metrics) WARN_UNUSED_RESULT;
+      bool one_dir_per_device, const std::string& compression_codec, bool punch_holes,
+      MetricGroup* metrics) WARN_UNUSED_RESULT;
 
   /// Return the scratch directory path for the device.
   std::string GetTmpDirPath(DeviceId device_id) const;
@@ -134,6 +143,24 @@ class TmpFileMgr {
   /// I.e. those that haven't been blacklisted.
   std::vector<DeviceId> ActiveTmpDevices();
 
+  MemTracker* compressed_buffer_tracker() const {
+    return compressed_buffer_tracker_.get();
+  }
+
+  /// The type of spill-to-disk compression in use for spilling.
+  THdfsCompression::type compression_codec() const { return compression_codec_; }
+  bool compression_enabled() const {
+    return compression_codec_ != THdfsCompression::NONE;
+  }
+  int compression_level() const { return compression_level_; }
+  bool punch_holes() const { return punch_holes_; }
+
+  /// The minimum size of hole that we will try to punch in a scratch file.
+  /// This avoids ineffective hole-punching where we only punch a hole in
+  /// part of a block and can't reclaim space. 4kb is chosen based on Linux
+  /// filesystem typically using 4kb or smaller blocks
+  static constexpr int64_t HOLE_PUNCH_BLOCK_SIZE_BYTES = 4096;
+
  private:
   friend class TmpFileMgrTest;
   friend class TmpFile;
@@ -147,17 +174,32 @@ class TmpFileMgr {
   void NewFile(TmpFileGroup* file_group, DeviceId device_id,
     std::unique_ptr<TmpFile>* new_file);
 
-  bool initialized_;
+  bool initialized_ = false;
+
+  /// The type of spill-to-disk compression in use for spilling. NONE means no
+  /// compression is used.
+  THdfsCompression::type compression_codec_ = THdfsCompression::NONE;
+
+  /// The compression level, which is used for certain compression codecs like ZSTD
+  /// and ignored otherwise. -1 means not set/invalid.
+  int compression_level_ = -1;
+
+  /// Whether hole punching is enabled.
+  bool punch_holes_ = false;
 
   /// The paths of the created tmp directories.
   std::vector<TmpDir> tmp_dirs_;
 
+  /// Memory tracker to track compressed buffers. Set up in InitCustom() if disk spill
+  /// compression is enabled
+  std::unique_ptr<MemTracker> compressed_buffer_tracker_;
+
   /// Metrics to track active scratch directories.
-  IntGauge* num_active_scratch_dirs_metric_;
-  SetMetric<std::string>* active_scratch_dirs_metric_;
+  IntGauge* num_active_scratch_dirs_metric_ = nullptr;
+  SetMetric<std::string>* active_scratch_dirs_metric_ = nullptr;
 
   /// Metrics to track the scratch space HWM.
-  AtomicHighWaterMarkGauge* scratch_bytes_used_metric_;
+  AtomicHighWaterMarkGauge* scratch_bytes_used_metric_ = nullptr;
 };
 
 /// Represents a group of temporary files - one per disk with a scratch directory. The
@@ -190,11 +232,13 @@ class TmpFileGroup {
   /// Returns an error if the scratch space cannot be allocated or the write cannot
   /// be started. Otherwise 'handle' is set and 'cb' will be called asynchronously from
   /// a different thread when the write completes successfully or unsuccessfully or is
-  /// cancelled.
+  /// cancelled. If non-null, the counters in 'counters' are updated with information
+  /// about the write.
   ///
   /// 'handle' must be destroyed by passing the DestroyWriteHandle() or RestoreData().
   Status Write(MemRange buffer, TmpFileMgr::WriteDoneCallback cb,
-      std::unique_ptr<TmpWriteHandle>* handle) WARN_UNUSED_RESULT;
+      std::unique_ptr<TmpWriteHandle>* handle,
+      const BufferPoolClientCounters* counters = nullptr);
 
   /// Synchronously read the data referenced by 'handle' from the temporary file into
   /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
@@ -212,14 +256,16 @@ class TmpFileGroup {
   /// Wait until the read started for 'handle' by ReadAsync() completes. 'buffer'
   /// should be the same buffer passed into ReadAsync(). Returns an error if the
   /// read fails. Retrying a failed read by calling ReadAsync() again is allowed.
-  Status WaitForAsyncRead(TmpWriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
+  /// If non-null, the counters in 'counters' are updated with information about the read.
+  Status WaitForAsyncRead(TmpWriteHandle* handle, MemRange buffer,
+      const BufferPoolClientCounters* counters = nullptr) WARN_UNUSED_RESULT;
 
-  /// Restore the original data in the 'buffer' passed to Write(), decrypting or
-  /// decompressing as necessary. Returns an error if restoring the data fails.
-  /// The write must not be in-flight - the caller is responsible for waiting for
-  /// the write to complete.
-  Status RestoreData(
-      std::unique_ptr<TmpWriteHandle> handle, MemRange buffer) WARN_UNUSED_RESULT;
+  /// Restore the original data in the 'buffer' passed to Write(), decrypting as
+  /// necessary. Returns an error if restoring the data fails. The write must not be
+  /// in-flight - the caller is responsible for waiting for the write to complete.
+  /// If non-null, the counters in 'counters' are updated with information about the read.
+  Status RestoreData(std::unique_ptr<TmpWriteHandle> handle, MemRange buffer,
+      const BufferPoolClientCounters* counters = nullptr) WARN_UNUSED_RESULT;
 
   /// Wait for the in-flight I/Os to complete and destroy resources associated with
   /// 'handle'.
@@ -237,6 +283,7 @@ class TmpFileGroup {
  private:
   friend class TmpFile;
   friend class TmpFileMgrTest;
+  friend class TmpWriteHandle;
 
   /// Initializes the file group with one temporary file per disk with a scratch
   /// directory. Returns OK if at least one temporary file could be created.
@@ -250,8 +297,10 @@ class TmpFileGroup {
   Status AllocateSpace(
       int64_t num_bytes, TmpFile** tmp_file, int64_t* file_offset) WARN_UNUSED_RESULT;
 
-  /// Add the scratch range from 'handle' to 'free_ranges_' and destroy handle. Must be
-  /// called without 'lock_' held.
+  /// Recycle the range of bytes in a scratch file and destroy 'handle'. Called when the
+  /// range is no longer in use for 'handle'. The disk space associated with the file can
+  /// be reclaimed once this function, either by adding it to 'free_ranges_' for
+  /// recycling, or punching a hole in the file. Must be called without 'lock_' held.
   void RecycleFileRange(std::unique_ptr<TmpWriteHandle> handle);
 
   /// Called when the DiskIoMgr write completes for 'handle'. On error, will attempt
@@ -302,6 +351,10 @@ class TmpFileGroup {
   /// Number of bytes written to disk (includes writes started but not yet complete).
   RuntimeProfile::Counter* const bytes_written_counter_;
 
+  /// Number of bytes written to disk before compression (includes writes started but
+  /// not yet complete).
+  RuntimeProfile::Counter* const uncompressed_bytes_written_counter_;
+
   /// Number of read operations (includes reads started but not yet complete).
   RuntimeProfile::Counter* const read_counter_;
 
@@ -317,6 +370,10 @@ class TmpFileGroup {
   /// Time spent in disk spill encryption, decryption, and integrity checking.
   RuntimeProfile::Counter* encryption_timer_;
 
+  /// Time spent in disk spill compression and decompression. nullptr if compression
+  /// is not enabled.
+  RuntimeProfile::Counter* compression_timer_;
+
   /// Protects below members.
   SpinLock lock_;
 
@@ -334,6 +391,7 @@ class TmpFileGroup {
   /// Each vector in free_ranges_[i] is a vector of File/offset pairs for free scratch
   /// ranges of length 2^i bytes. Has 64 entries so that every int64_t length has a
   /// valid list associated with it.
+  /// Only used if --disk_spill_punch_holes is false.
   std::vector<std::vector<std::pair<TmpFile*, int64_t>>> free_ranges_;
 
   /// Errors encountered when creating/writing scratch files. We store the history so
@@ -368,8 +426,14 @@ class TmpWriteHandle {
   /// Returns empty string if no backing file allocated.
   std::string TmpFilePath() const;
 
-  /// The length of the write range in bytes.
-  int64_t len() const;
+  /// The length of the in-memory data written to disk in bytes, before any compression.
+  int64_t data_len() const { return data_len_; }
+
+  /// The size of the data on disk (after compression) in bytes. Only valid to call if
+  /// Write() succeeds.
+  int64_t on_disk_len() const;
+
+  bool is_compressed() const { return compressed_len_ >= 0; }
 
   std::string DebugString();
 
@@ -377,15 +441,26 @@ class TmpWriteHandle {
   friend class TmpFileGroup;
   friend class TmpFileMgrTest;
 
-  TmpWriteHandle(
-      RuntimeProfile::Counter* encryption_timer, TmpFileMgr::WriteDoneCallback cb);
-
-  /// Starts a write of 'buffer' to 'offset' of 'file'. 'write_in_flight_' must be false
-  /// before calling. After returning, 'write_in_flight_' is true on success or false on
-  /// failure and 'is_cancelled_' is set to true on failure.
-  Status Write(io::RequestContext* io_ctx, TmpFile* file,
-      int64_t offset, MemRange buffer,
-      TmpFileMgr::WriteDoneCallback callback) WARN_UNUSED_RESULT;
+  TmpWriteHandle(TmpFileGroup* const parent, TmpFileMgr::WriteDoneCallback cb);
+
+  /// Starts a write. This method allocates space in the file, compresses (if needed) and
+  /// encrypts (if needed). 'write_in_flight_' must be false before calling. After
+  /// returning, 'write_in_flight_' is true on success or false on failure and
+  /// 'is_cancelled_' is set to true on failure. If the data was compressed,
+  /// 'compressed_len_' will be non-negative and 'compressed_' will be the temporary
+  /// buffer used to hold the compressed data.
+  /// If non-null, the counters in 'counters' are updated with information about the read.
+  Status Write(io::RequestContext* io_ctx, MemRange buffer,
+      TmpFileMgr::WriteDoneCallback callback,
+      const BufferPoolClientCounters* counters = nullptr);
+
+  /// Try to compress 'buffer'. On success, returns true and 'compressed_' and
+  /// 'compressed_len_' contain the buffer used (with the length reflecting the
+  /// allocated size) and the length of the compressed data, respectively. On failure,
+  /// returns false and 'compressed_' will be an empty buffer and 'compressed_len_'
+  /// will be -1. The reason for the failure to compress may be logged.
+  /// If non-null, the counters in 'counters' are updated with compression time.
+  bool TryCompress(MemRange buffer, const BufferPoolClientCounters* counters);
 
   /// Retry the write after the initial write failed with an error, instead writing to
   /// 'offset' of 'file'. 'write_in_flight_' must be true before calling.
@@ -408,22 +483,30 @@ class TmpWriteHandle {
   void WaitForWrite();
 
   /// Encrypts the data in 'buffer' in-place and computes 'hash_'.
-  Status EncryptAndHash(MemRange buffer) WARN_UNUSED_RESULT;
+  /// If non-null, the counters in 'counters' are updated with compression time.
+  Status EncryptAndHash(MemRange buffer, const BufferPoolClientCounters* counters);
 
   /// Verifies the integrity hash and decrypts the contents of 'buffer' in place.
-  Status CheckHashAndDecrypt(MemRange buffer) WARN_UNUSED_RESULT;
+  /// If non-null, the counters in 'counters' are updated with compression time.
+  Status CheckHashAndDecrypt(MemRange buffer, const BufferPoolClientCounters* counters);
+
+  /// Free 'compressed_' and update memory accounting. No-op if 'compressed_' is empty.
+  void FreeCompressedBuffer();
+
+  TmpFileGroup* const parent_;
 
   /// Callback to be called when the write completes.
   TmpFileMgr::WriteDoneCallback cb_;
 
-  /// Reference to the TmpFileGroup's 'encryption_timer_'.
-  RuntimeProfile::Counter* encryption_timer_;
+  /// Length of the in-memory data buffer that was written to disk. If compression
+  /// is in use, this is the uncompressed size. Set in Write().
+  int64_t data_len_ = -1;
 
   /// The DiskIoMgr write range for this write.
   boost::scoped_ptr<io::WriteRange> write_range_;
 
   /// The temporary file being written to.
-  TmpFile* file_;
+  TmpFile* file_ = nullptr;
 
   /// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector.
   /// Regenerated for each write.
@@ -435,7 +518,7 @@ class TmpWriteHandle {
 
   /// The scan range for the read that is currently in flight. NULL when no read is in
   /// flight.
-  io::ScanRange* read_range_;
+  io::ScanRange* read_range_ = nullptr;
 
   /// Protects all fields below while 'write_in_flight_' is true. At other times, it is
   /// invalid to call WriteRange/TmpFileGroup methods concurrently from multiple
@@ -445,10 +528,22 @@ class TmpWriteHandle {
   std::mutex write_state_lock_;
 
   /// True if the the write has been cancelled (but is not necessarily complete).
-  bool is_cancelled_;
+  bool is_cancelled_ = false;
 
   /// True if a write is in flight.
-  bool write_in_flight_;
+  bool write_in_flight_ = false;
+
+  /// The buffer used to store compressed data. Buffer is allocated while reading or
+  /// writing a compressed range.
+  /// TODO: ScopedBuffer is a suboptimal memory allocation approach. We would be better
+  /// off integrating more directly with the buffer pool to use its buffer allocator and
+  /// making the compression buffers somehow evictable.
+  ScopedBuffer compressed_;
+
+  /// Set to non-negative if the data in this range was compressed. In that case,
+  /// 'compressed_' is the buffer used to store the data and 'compressed_len_' is the
+  /// amount of valid data in the buffer.
+  int64_t compressed_len_ = -1;
 
   /// Signalled when the write completes and 'write_in_flight_' becomes false, before
   /// 'cb_' is invoked.
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 690c3da..50483c7 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -25,7 +25,6 @@
 #include "exprs/timezone_db.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
-#include <zstd.h>
 #include <sstream>
 #include <boost/algorithm/string.hpp>
 #include <gutil/strings/strip.h>
@@ -186,35 +185,6 @@ static bool IsRemovedQueryOption(const string& key) {
   return false;
 }
 
-// Return all enum values in a string format, e.g. FOO(1), BAR(2), BAZ(3).
-static string GetThriftEnumValues(const map<int, const char*>& enum_values_to_names) {
-  bool first = true;
-  stringstream ss;
-  for (const auto& e : enum_values_to_names) {
-    if (!first) {
-      ss << ", ";
-    } else {
-      first = false;
-    }
-    ss << e.second << "(" << e.first << ")";
-  }
-  return ss.str();
-}
-
-// Return false for an invalid Thrift enum value.
-template<typename ENUM_TYPE>
-static Status GetThriftEnum(const string& value, const string& key,
-    const map<int, const char*>& enum_values_to_names, ENUM_TYPE* enum_value) {
-  for (const auto& e : enum_values_to_names) {
-    if (iequals(value, to_string(e.first)) || iequals(value, e.second)) {
-      *enum_value = static_cast<ENUM_TYPE>(e.first);
-      return Status::OK();
-    }
-  }
-  return Status(Substitute("Invalid $0: '$1'. Valid values are $2.", key, value,
-      GetThriftEnumValues(enum_values_to_names)));
-}
-
 // Return true if the given value is true (case-insensitive) or 1.
 static bool IsTrue(const string& value) {
   return iequals(value, "true") || iequals(value, "1");
@@ -279,36 +249,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_debug_action(value.c_str());
         break;
       case TImpalaQueryOptions::COMPRESSION_CODEC: {
-        // Acceptable values are:
-        // - zstd:compression_level
-        // - codec
-        vector<string> tokens;
-        split(tokens, value, is_any_of(":"), token_compress_on);
-        if (tokens.size() > 2) return Status("Invalid compression codec value");
-
-        string& codec_name = tokens[0];
-        trim(codec_name);
-        int compression_level = ZSTD_CLEVEL_DEFAULT;
         THdfsCompression::type enum_type;
-        RETURN_IF_ERROR(GetThriftEnum(codec_name, "compression codec",
-            _THdfsCompression_VALUES_TO_NAMES, &enum_type));
-
-        if (tokens.size() == 2) {
-          if (enum_type != THdfsCompression::ZSTD) {
-            return Status("Compression level only supported for ZSTD");
-          }
-          StringParser::ParseResult status;
-          string& clevel = tokens[1];
-          trim(clevel);
-          compression_level = StringParser::StringToInt<int>(
-            clevel.c_str(), static_cast<int>(clevel.size()), &status);
-          if (status != StringParser::PARSE_SUCCESS || compression_level < 1
-              || compression_level > ZSTD_maxCLevel()) {
-            return Status(Substitute("Invalid ZSTD compression level '$0'."
-                " Valid values are in [1,$1]", clevel, ZSTD_maxCLevel()));
-          }
-        }
-
+        int compression_level;
+        RETURN_IF_ERROR(
+            ParseUtil::ParseCompressionCodec(value, &enum_type, &compression_level));
         TCompressionCodec compression_codec;
         compression_codec.__set_codec(enum_type);
         if (enum_type == THdfsCompression::ZSTD) {
diff --git a/be/src/util/parse-util.cc b/be/src/util/parse-util.cc
index a70db99..9ff31c5 100644
--- a/be/src/util/parse-util.cc
+++ b/be/src/util/parse-util.cc
@@ -16,11 +16,24 @@
 // under the License.
 
 #include "util/parse-util.h"
+
+#include <sstream>
+
+#include <zstd.h>
+#include <boost/algorithm/string/classification.hpp>
+#include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string/trim.hpp>
+
 #include "util/mem-info.h"
 #include "util/string-parser.h"
 
 #include "common/names.h"
 
+using boost::algorithm::is_any_of;
+using boost::algorithm::split;
+using boost::algorithm::token_compress_on;
+using boost::algorithm::trim;
+
 namespace impala {
 
 int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent,
@@ -101,4 +114,55 @@ int64_t ParseUtil::ParseMemSpec(const string& mem_spec_str, bool* is_percent,
   return bytes;
 }
 
+Status ParseUtil::ParseCompressionCodec(
+    const string& compression_codec, THdfsCompression::type* type, int* level) {
+  // Acceptable values are:
+  // - zstd:compression_level
+  // - codec
+  vector<string> tokens;
+  split(tokens, compression_codec, is_any_of(":"), token_compress_on);
+  if (tokens.size() > 2) return Status("Invalid compression codec value");
+
+  string& codec_name = tokens[0];
+  trim(codec_name);
+  int compression_level = ZSTD_CLEVEL_DEFAULT;
+  THdfsCompression::type enum_type;
+  RETURN_IF_ERROR(GetThriftEnum(
+      codec_name, "compression codec", _THdfsCompression_VALUES_TO_NAMES, &enum_type));
+
+  if (tokens.size() == 2) {
+    if (enum_type != THdfsCompression::ZSTD) {
+      return Status("Compression level only supported for ZSTD");
+    }
+    StringParser::ParseResult status;
+    string& clevel = tokens[1];
+    trim(clevel);
+    compression_level = StringParser::StringToInt<int>(
+        clevel.c_str(), static_cast<int>(clevel.size()), &status);
+    if (status != StringParser::PARSE_SUCCESS || compression_level < 1
+        || compression_level > ZSTD_maxCLevel()) {
+      return Status(Substitute("Invalid ZSTD compression level '$0'."
+                               " Valid values are in [1,$1]",
+          clevel, ZSTD_maxCLevel()));
+    }
+  }
+  *type = enum_type;
+  *level = compression_level;
+  return Status::OK();
+}
+
+// Return all enum values in a string format, e.g. FOO(1), BAR(2), BAZ(3).
+string GetThriftEnumValues(const map<int, const char*>& enum_values_to_names) {
+  bool first = true;
+  stringstream ss;
+  for (const auto& e : enum_values_to_names) {
+    if (!first) {
+      ss << ", ";
+    } else {
+      first = false;
+    }
+    ss << e.second << "(" << e.first << ")";
+  }
+  return ss.str();
+}
 }
diff --git a/be/src/util/parse-util.h b/be/src/util/parse-util.h
index 40660e5..907349e 100644
--- a/be/src/util/parse-util.h
+++ b/be/src/util/parse-util.h
@@ -18,8 +18,15 @@
 #pragma once
 
 #include <cstdint>
+#include <map>
 #include <string>
 
+#include <boost/algorithm/string/predicate.hpp>
+
+#include "common/status.h"
+#include "gen-cpp/CatalogObjects_types.h" // for THdfsCompression
+#include "gutil/strings/substitute.h"
+
 namespace impala {
 
 /// Utility class for parsing information from strings.
@@ -40,5 +47,27 @@ class ParseUtil {
   /// Returns -1 if parsing failed.
   static int64_t ParseMemSpec(const std::string& mem_spec_str,
       bool* is_percent, int64_t relative_reference);
+
+  static Status ParseCompressionCodec(
+      const std::string& compression_codec, THdfsCompression::type* type, int* level);
 };
+
+std::string GetThriftEnumValues(const std::map<int, const char*>& enum_values_to_names);
+
+/// Parses a string into a value of 'ENUM_TYPE' - if the string matches the enum name
+/// (case-insensitive), that value is returned.
+/// Return an error for an invalid Thrift enum value.
+template <typename ENUM_TYPE>
+static inline Status GetThriftEnum(const std::string& value, const std::string& desc,
+    const std::map<int, const char*>& enum_values_to_names, ENUM_TYPE* enum_value) {
+  for (const auto& e : enum_values_to_names) {
+    if (boost::algorithm::iequals(value, std::to_string(e.first))
+        || boost::algorithm::iequals(value, e.second)) {
+      *enum_value = static_cast<ENUM_TYPE>(e.first);
+      return Status::OK();
+    }
+  }
+  return Status(strings::Substitute("Invalid $0: '$1'. Valid values are $2.", desc, value,
+      GetThriftEnumValues(enum_values_to_names)));
+}
 }
diff --git a/bin/jenkins/dockerized-impala-run-tests.sh b/bin/jenkins/dockerized-impala-run-tests.sh
index 45f7e86..6271e0a 100755
--- a/bin/jenkins/dockerized-impala-run-tests.sh
+++ b/bin/jenkins/dockerized-impala-run-tests.sh
@@ -80,6 +80,8 @@ FAIR_SCHED_CONF=/opt/impala/conf/minicluster-fair-scheduler.xml
 LLAMA_CONF=/opt/impala/conf/minicluster-llama-site.xml
 export TEST_START_CLUSTER_ARGS="--docker_network=${DOCKER_NETWORK}"
 TEST_START_CLUSTER_ARGS+=" --data_cache_dir=/tmp --data_cache_size=500m"
+TEST_START_CLUSTER_ARGS+=" --impalad_args=--disk_spill_compression_codec=lz4"
+TEST_START_CLUSTER_ARGS+=" --impalad_args=--disk_spill_punch_holes=true"
 TEST_START_CLUSTER_ARGS+=" --impalad_args=-fair_scheduler_allocation_path=${FAIR_SCHED_CONF}"
 TEST_START_CLUSTER_ARGS+=" --impalad_args=-llama_site_path=${LLAMA_CONF}"
 export MAX_PYTEST_FAILURES=0
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 95136db..a1daa5e 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -92,7 +92,9 @@ class TestScratchDir(CustomClusterTestSuite):
     normal_dirs = self.generate_dirs(5)
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
-      '--impalad_args=--allow_multiple_scratch_dirs_per_device=false'])
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=false',
+      '--impalad_args=--disk_spill_compression_codec=zstd',
+      '--impalad_args=--disk_spill_punch_holes=true'])
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=1)
     exec_option = vector.get_value('exec_option')
@@ -169,7 +171,9 @@ class TestScratchDir(CustomClusterTestSuite):
     dirs = self.generate_dirs(3);
     self._start_impala_cluster([
       '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(dirs)),
-      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'])
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
+      '--impalad_args=--disk_spill_compression_codec=zstd',
+      '--impalad_args=--disk_spill_punch_holes=true'])
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=len(dirs))
     exec_option = vector.get_value('exec_option')
diff --git a/tests/verifiers/metric_verifier.py b/tests/verifiers/metric_verifier.py
index df51822..59848b3 100644
--- a/tests/verifiers/metric_verifier.py
+++ b/tests/verifiers/metric_verifier.py
@@ -35,7 +35,9 @@ METRIC_LIST = [
                "buffer-pool.clean-pages",
                "buffer-pool.clean-page-bytes",
                "impala-server.num-open-beeswax-sessions",
-               "impala-server.num-open-hiveserver2-sessions"]
+               "impala-server.num-open-hiveserver2-sessions",
+               "tmp-file-mgr.scratch-space-bytes-used",
+               "tmp-file-mgr.scratch-space-bytes-used.dir-0"]
 
 class MetricVerifier(object):
   """Reuseable class that can verify common metrics"""