You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ab...@apache.org on 2022/05/26 15:04:15 UTC

[kudu] branch master updated: KUDU-3368 Encrypt file keys with server keys

This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b6aaf2b71 KUDU-3368 Encrypt file keys with server keys
b6aaf2b71 is described below

commit b6aaf2b71da05c5427e1bee0dccc0e66101c684d
Author: Attila Bukor <ab...@apache.org>
AuthorDate: Thu May 12 01:20:38 2022 +0200

    KUDU-3368 Encrypt file keys with server keys
    
    This patch removes the dummy encryption key used to encrypt all file
    keys and introduces the server key, which is used instead on a
    per-server basis. The server key is stored in the instance files,
    currently in cleartext. Encrypting the server key will be done in a
    follow-up patch.
    
    Servers in internal mini-cluster are forced to use the same server key
    as they share an environment, but servers in external mini-cluster can
    use different keys. Multiple (non-default) PosixEnv instances can now be
    created to make this possible.
    
    Change-Id: I1884f62fde3bb110291b1d01c5e68942c6071318
    Reviewed-on: http://gerrit.cloudera.org:8080/18192
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Zoltan Chovan <zc...@cloudera.com>
---
 src/kudu/consensus/consensus_meta-test.cc          | 20 +++----
 src/kudu/consensus/raft_consensus_quorum-test.cc   |  7 ++-
 src/kudu/fs/fs.proto                               |  3 +
 src/kudu/fs/fs_manager.cc                          | 32 ++++++++++-
 src/kudu/fs/fs_manager.h                           | 14 ++++-
 src/kudu/fs/log_block_manager-test.cc              | 63 ++++++++++-----------
 src/kudu/integration-tests/log_verifier.cc         |  2 +-
 .../integration-tests/mini_cluster_fs_inspector.cc |  8 ++-
 src/kudu/integration-tests/raft_consensus-itest.cc |  3 +-
 src/kudu/integration-tests/tablet_copy-itest.cc    | 39 +++++++++++++
 src/kudu/mini-cluster/external_mini_cluster.cc     | 36 +++++++++++-
 src/kudu/mini-cluster/external_mini_cluster.h      | 19 ++++++-
 src/kudu/mini-cluster/internal_mini_cluster.cc     |  4 ++
 src/kudu/mini-cluster/internal_mini_cluster.h      | 14 +++++
 src/kudu/mini-cluster/mini_cluster.h               |  4 ++
 src/kudu/server/server_base.cc                     |  6 +-
 src/kudu/server/server_base_options.cc             | 11 +++-
 src/kudu/server/server_base_options.h              |  2 +
 src/kudu/tools/kudu-tool-test.cc                   | 53 ++++++++++++++----
 src/kudu/tools/tool_action_common.cc               | 25 +++++++++
 src/kudu/tools/tool_action_common.h                |  4 ++
 src/kudu/tools/tool_action_fs.cc                   |  8 ++-
 src/kudu/tools/tool_action_pbc.cc                  |  3 +
 src/kudu/tools/tool_action_wal.cc                  |  1 +
 src/kudu/tserver/tablet_copy_client-test.cc        | 10 +++-
 src/kudu/tserver/tablet_copy_client.cc             |  6 +-
 src/kudu/tserver/tablet_server-test-base.cc        |  3 +
 src/kudu/util/env-test.cc                          |  4 +-
 src/kudu/util/env.h                                | 11 ++++
 src/kudu/util/env_posix.cc                         | 64 +++++++++++++++++-----
 src/kudu/util/env_util.cc                          |  7 +--
 src/kudu/util/file_cache-test.cc                   |  3 -
 src/kudu/util/pb_util-test.cc                      |  6 +-
 src/kudu/util/test_util.cc                         | 22 +++++++-
 src/kudu/util/test_util.h                          |  6 ++
 35 files changed, 413 insertions(+), 110 deletions(-)

diff --git a/src/kudu/consensus/consensus_meta-test.cc b/src/kudu/consensus/consensus_meta-test.cc
index 0150d2c49..6d66d4a7a 100644
--- a/src/kudu/consensus/consensus_meta-test.cc
+++ b/src/kudu/consensus/consensus_meta-test.cc
@@ -79,10 +79,6 @@ class ConsensusMetadataTest : public KuduTest, public ::testing::WithParamInterf
                          int64_t opid_index, const string& permanant_uuid, int64_t term);
 
 
-  void EnableEncryption(bool enable) {
-    FLAGS_encrypt_data_at_rest = enable;
-  }
-
   FsManager fs_manager_;
   RaftConfigPB config_;
 };
@@ -104,7 +100,7 @@ INSTANTIATE_TEST_SUITE_P(, ConsensusMetadataTest, ::testing::Values(false, true)
 
 // Test the basic "happy case" of creating and then loading a file.
 TEST_P(ConsensusMetadataTest, TestCreateLoad) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Create the file.
   {
     ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, fs_manager_.uuid(),
@@ -120,7 +116,7 @@ TEST_P(ConsensusMetadataTest, TestCreateLoad) {
 
 // Test deferred creation.
 TEST_P(ConsensusMetadataTest, TestDeferredCreateLoad) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Create the cmeta object, but not the file.
   scoped_refptr<ConsensusMetadata> writer;
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, fs_manager_.uuid(),
@@ -141,7 +137,7 @@ TEST_P(ConsensusMetadataTest, TestDeferredCreateLoad) {
 
 // Ensure that Create() will not overwrite an existing file.
 TEST_P(ConsensusMetadataTest, TestCreateNoOverwrite) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Create the consensus metadata file.
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, fs_manager_.uuid(),
                                       config_, kInitialTerm));
@@ -154,7 +150,7 @@ TEST_P(ConsensusMetadataTest, TestCreateNoOverwrite) {
 
 // Ensure that we get an error when loading a file that doesn't exist.
 TEST_P(ConsensusMetadataTest, TestFailedLoad) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   Status s = ConsensusMetadata::Load(&fs_manager_, kTabletId, fs_manager_.uuid());
   ASSERT_TRUE(s.IsNotFound()) << s.ToString();
   LOG(INFO) << "Expected failure: " << s.ToString();
@@ -162,7 +158,7 @@ TEST_P(ConsensusMetadataTest, TestFailedLoad) {
 
 // Check that changes are not written to disk until Flush() is called.
 TEST_P(ConsensusMetadataTest, TestFlush) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int64_t kNewTerm = 4;
   scoped_refptr<ConsensusMetadata> cmeta;
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, fs_manager_.uuid(),
@@ -206,7 +202,7 @@ RaftConfigPB BuildConfig(const vector<string>& uuids) {
 
 // Test ConsensusMetadata active role calculation.
 TEST_P(ConsensusMetadataTest, TestActiveRole) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   vector<string> uuids = { "a", "b", "c", "d" };
   string peer_uuid = "e";
   RaftConfigPB config1 = BuildConfig(uuids); // We aren't a member of this config...
@@ -273,7 +269,7 @@ TEST_P(ConsensusMetadataTest, TestActiveRole) {
 // Ensure that invocations of ToConsensusStatePB() return the expected state
 // in the returned object.
 TEST_P(ConsensusMetadataTest, TestToConsensusStatePB) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   vector<string> uuids = { "a", "b", "c", "d" };
   string peer_uuid = "e";
 
@@ -333,7 +329,7 @@ static void AssertConsensusMergeExpected(const scoped_refptr<ConsensusMetadata>&
 
 // Ensure that MergeCommittedConsensusStatePB() works as advertised.
 TEST_P(ConsensusMetadataTest, TestMergeCommittedConsensusStatePB) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   vector<string> uuids = { "a", "b", "c", "d" };
 
   RaftConfigPB committed_config = BuildConfig(uuids); // We aren't a member of this config...
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 81bc554ef..19381074d 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -132,7 +132,12 @@ class RaftConsensusQuorumTest : public KuduTest {
       opts.wal_root = test_path;
       opts.data_roots = { test_path };
       unique_ptr<FsManager> fs_manager(new FsManager(env_, opts));
-      RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout());
+      string server_key = GetEncryptionKey();
+      if (server_key.empty()) {
+        RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout());
+      } else {
+        RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout(boost::none, server_key));
+      }
       RETURN_NOT_OK(fs_manager->Open());
 
       scoped_refptr<Log> log;
diff --git a/src/kudu/fs/fs.proto b/src/kudu/fs/fs.proto
index 7b0a44ae0..19c0051cd 100644
--- a/src/kudu/fs/fs.proto
+++ b/src/kudu/fs/fs.proto
@@ -35,6 +35,9 @@ message InstanceMetadataPB {
   // initialized.
   required string format_stamp = 2;
 
+  // Encrypted server key used to encrypt/decrypt file keys on this server.
+  optional bytes server_key = 3;
+
   // TODO: add a "node type" (TS/Master?)
 }
 
diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index 05b7b0e46..56315592d 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -22,6 +22,7 @@
 #include <functional>
 #include <initializer_list>
 #include <iostream>
+#include <openssl/rand.h>
 #include <unordered_map>
 #include <unordered_set>
 #include <utility>
@@ -40,6 +41,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
@@ -53,6 +55,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/oid_generator.h"
+#include "kudu/util/openssl_util.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -124,6 +127,9 @@ METRIC_DEFINE_gauge_int64(server, log_block_manager_containers_processing_time_s
                           "files during the startup",
                           kudu::MetricLevel::kDebug);
 
+DECLARE_bool(encrypt_data_at_rest);
+DECLARE_int32(encryption_key_length);
+
 using kudu::fs::BlockManagerOptions;
 using kudu::fs::CreateBlockOptions;
 using kudu::fs::DataDirManager;
@@ -451,6 +457,12 @@ Status FsManager::Open(FsReport* report, Timer* read_instance_metadata_files,
     read_instance_metadata_files->Stop();
   }
 
+  if (!server_key().empty()) {
+    env_->SetEncryptionKey(server_key().length() * 4,
+                           reinterpret_cast<const uint8_t*>(
+                             strings::a2b_hex(server_key()).c_str()));
+  }
+
   // Open the directory manager if it has not been opened already.
   if (!dd_manager_) {
     DataDirManagerOptions dm_opts;
@@ -530,7 +542,8 @@ Status FsManager::Open(FsReport* report, Timer* read_instance_metadata_files,
   return Status::OK();
 }
 
-Status FsManager::CreateInitialFileSystemLayout(boost::optional<string> uuid) {
+Status FsManager::CreateInitialFileSystemLayout(boost::optional<string> uuid,
+                                                boost::optional<string> server_key) {
   CHECK(!opts_.read_only);
 
   RETURN_NOT_OK(Init());
@@ -554,7 +567,7 @@ Status FsManager::CreateInitialFileSystemLayout(boost::optional<string> uuid) {
   //
   // Files/directories created will NOT be synchronized to disk.
   InstanceMetadataPB metadata;
-  RETURN_NOT_OK_PREPEND(CreateInstanceMetadata(std::move(uuid), &metadata),
+  RETURN_NOT_OK_PREPEND(CreateInstanceMetadata(std::move(uuid), std::move(server_key), &metadata),
                         "unable to create instance metadata");
   RETURN_NOT_OK_PREPEND(FsManager::CreateFileSystemRoots(
       canonicalized_all_fs_roots_, metadata, &created_dirs, &created_files),
@@ -649,6 +662,7 @@ Status FsManager::CreateFileSystemRoots(
 }
 
 Status FsManager::CreateInstanceMetadata(boost::optional<string> uuid,
+                                         boost::optional<string> server_key,
                                          InstanceMetadataPB* metadata) {
   if (uuid) {
     string canonicalized_uuid;
@@ -657,6 +671,16 @@ Status FsManager::CreateInstanceMetadata(boost::optional<string> uuid,
   } else {
     metadata->set_uuid(oid_generator_.Next());
   }
+  if (server_key) {
+    metadata->set_server_key(server_key.get());
+  } else if (FLAGS_encrypt_data_at_rest) {
+    uint8_t key_bytes[32];
+    int num_bytes = FLAGS_encryption_key_length / 8;
+    DCHECK(num_bytes <= sizeof(key_bytes));
+    OPENSSL_RET_NOT_OK(RAND_bytes(key_bytes, num_bytes),
+                       "Failed to generate random key");
+    strings::b2a_hex(key_bytes, metadata->mutable_server_key(), num_bytes);
+  }
 
   string time_str;
   StringAppendStrftime(&time_str, "%Y-%m-%d %H:%M:%S", time(nullptr), false);
@@ -688,6 +712,10 @@ const string& FsManager::uuid() const {
   return CHECK_NOTNULL(metadata_.get())->uuid();
 }
 
+const string& FsManager::server_key() const {
+  return CHECK_NOTNULL(metadata_.get())->server_key();
+}
+
 vector<string> FsManager::GetDataRootDirs() const {
   // Get the data subdirectory for each data root.
   return dd_manager_->GetDirs();
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index 9243e2671..9c26ca1df 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -199,11 +199,14 @@ class FsManager {
               std::atomic<int>* containers_total = nullptr );
 
   // Create the initial filesystem layout. If 'uuid' is provided, uses it as
-  // uuid of the filesystem. Otherwise generates one at random.
+  // uuid of the filesystem. Otherwise generates one at random. If 'server_key'
+  // is provided, it is used as the server key of the filesystem. Otherwise, if
+  // encryption is enabled, generates one at random.
   //
   // Returns an error if the file system is already initialized.
   Status CreateInitialFileSystemLayout(
-      boost::optional<std::string> uuid = boost::none);
+      boost::optional<std::string> uuid = boost::none,
+      boost::optional<std::string> server_key = boost::none);
 
   // ==========================================================================
   //  Error handling helpers
@@ -288,6 +291,12 @@ class FsManager {
   // Open() have not been called, this will crash.
   const std::string& uuid() const;
 
+  // Return the server key persisted on the local filesystem. After the server
+  // key is decrypted, it can be used to encrypt/decrypt file keys on the
+  // filesystem.  If PartialOpen() or Open() have not been called, this will
+  // crash. If the file system is not encrypted, it returns an empty string.
+  const std::string& server_key() const;
+
   // ==========================================================================
   //  file-system helpers
   // ==========================================================================
@@ -348,6 +357,7 @@ class FsManager {
 
   // Create a new InstanceMetadataPB.
   Status CreateInstanceMetadata(boost::optional<std::string> uuid,
+                                boost::optional<std::string> server_key,
                                 InstanceMetadataPB* metadata);
 
   // Save a InstanceMetadataPB to the filesystem.
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index c8f05e9f7..efe44b1ae 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -239,10 +239,6 @@ class LogBlockManagerTest : public KuduTest, public ::testing::WithParamInterfac
     ASSERT_TRUE(report.partial_record_check->entries.empty());
   }
 
-  void EnableEncryption(bool enable) {
-    FLAGS_encrypt_data_at_rest = enable;
-  }
-
   DataDirGroupPB test_group_pb_;
   string test_tablet_name_;
   CreateBlockOptions test_block_opts_;
@@ -342,7 +338,7 @@ static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
 INSTANTIATE_TEST_SUITE_P(EncryptionEnabled, LogBlockManagerTest, ::testing::Values(false, true));
 
 TEST_P(LogBlockManagerTest, MetricsTest) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   MetricRegistry registry;
   scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
   ASSERT_OK(ReopenBlockManager(entity));
@@ -515,7 +511,7 @@ TEST_P(LogBlockManagerTest, MetricsTest) {
 }
 
 TEST_P(LogBlockManagerTest, ContainerPreallocationTest) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   string kTestData = "test data";
 
   // For this test to work properly, the preallocation window has to be at
@@ -560,7 +556,7 @@ TEST_P(LogBlockManagerTest, ContainerPreallocationTest) {
 // Test for KUDU-2202 to ensure that once the block manager has been notified
 // of a block ID, it will not reuse it.
 TEST_P(LogBlockManagerTest, TestBumpBlockIds) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int kNumBlocks = 10;
   vector<BlockId> block_ids;
   unique_ptr<WritableBlock> writer;
@@ -593,7 +589,7 @@ TEST_P(LogBlockManagerTest, TestBumpBlockIds) {
 // Regression test for KUDU-1190, a crash at startup when a block ID has been
 // reused.
 TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Typically, the LBM starts with a random block ID when running as a
   // gtest. In this test, we want to control the block IDs.
   bm_->next_block_id_.Store(1);
@@ -668,7 +664,7 @@ TEST_P(LogBlockManagerTest, TestReuseBlockIds) {
 // Note that we rely on filesystem integrity to ensure that we do not lose
 // trailing, fsync()ed metadata.
 TEST_P(LogBlockManagerTest, TestMetadataTruncation) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Create several blocks.
   vector<BlockId> created_blocks;
   BlockId last_block_id;
@@ -863,7 +859,7 @@ TEST_P(LogBlockManagerTest, TestMetadataTruncation) {
 // Regression test for a crash when a container's append offset exceeded its
 // preallocation offset.
 TEST_P(LogBlockManagerTest, TestAppendExceedsPreallocation) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   FLAGS_log_container_preallocate_bytes = 1;
 
   // Create a container, preallocate it by one byte, and append more than one.
@@ -879,7 +875,7 @@ TEST_P(LogBlockManagerTest, TestAppendExceedsPreallocation) {
 }
 
 TEST_P(LogBlockManagerTest, TestPreallocationAndTruncation) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Ensure preallocation window is greater than the container size itself.
   FLAGS_log_container_max_size = 1024 * 1024;
   FLAGS_log_container_preallocate_bytes = 32 * 1024 * 1024;
@@ -943,7 +939,7 @@ TEST_P(LogBlockManagerTest, TestPreallocationAndTruncation) {
 }
 
 TEST_P(LogBlockManagerTest, TestContainerWithManyHoles) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // This is a regression test of sorts for KUDU-1508, though it doesn't
   // actually fail if the fix is missing; it just corrupts the filesystem.
 
@@ -1002,7 +998,7 @@ TEST_P(LogBlockManagerTest, TestContainerWithManyHoles) {
 }
 
 TEST_P(LogBlockManagerTest, TestParseKernelRelease) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("1.7.0.0.el6.x86_64"));
 
   // no el6 infix
@@ -1126,7 +1122,7 @@ TEST_P(LogBlockManagerStartupBenchmarkTest, StartupBenchmark) {
 #endif
 
 TEST_P(LogBlockManagerTest, TestFailMultipleTransactionsPerContainer) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Create multiple transactions that will share a container.
   const int kNumTransactions = 3;
   vector<unique_ptr<BlockCreationTransaction>> block_transactions;
@@ -1185,7 +1181,7 @@ TEST_P(LogBlockManagerTest, TestFailMultipleTransactionsPerContainer) {
 }
 
 TEST_P(LogBlockManagerTest, TestLookupBlockLimit) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   int64_t limit_1024 = LogBlockManager::LookupBlockLimit(1024);
   int64_t limit_2048 = LogBlockManager::LookupBlockLimit(2048);
   int64_t limit_4096 = LogBlockManager::LookupBlockLimit(4096);
@@ -1203,7 +1199,7 @@ TEST_P(LogBlockManagerTest, TestLookupBlockLimit) {
 }
 
 TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByBlockNum) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int kNumBlocks = 1000;
 
   // Creates 'kNumBlocks' blocks with minimal data.
@@ -1238,7 +1234,7 @@ TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByBlockNum) {
 }
 
 TEST_P(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSize) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int kNumBlocks = 1000;
 
   // Creates 'kNumBlocks' blocks with minimal data.
@@ -1365,8 +1361,7 @@ TEST_F(LogBlockManagerTest, TestContainerBlockLimitingByMetadataSizeWithCompacti
 }
 
 TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
-  EnableEncryption(GetParam());
-
+  SetEncryptionFlags(GetParam());
   FLAGS_log_container_preallocate_bytes = 0;
   const int kNumBlocks = 100;
 
@@ -1469,7 +1464,7 @@ TEST_P(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
 }
 
 TEST_P(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Enforce that the container's actual size is strictly upper-bounded by the
   // calculated size so we can more easily trigger repairs.
   FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
@@ -1518,7 +1513,7 @@ TEST_P(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
 }
 
 TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int kNumBlocks = 100;
 
   // Enforce that the container's actual size is strictly upper-bounded by the
@@ -1579,7 +1574,7 @@ TEST_P(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
 }
 
 TEST_P(LogBlockManagerTest, TestRepairIncompleteContainer) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int kNumContainers = 20;
 
   // Create some incomplete containers. The corruptor will select between
@@ -1610,7 +1605,7 @@ TEST_P(LogBlockManagerTest, TestRepairIncompleteContainer) {
 }
 
 TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int kNumRecords = 50;
 
   // Create one container.
@@ -1643,7 +1638,7 @@ TEST_P(LogBlockManagerTest, TestDetectMalformedRecords) {
 }
 
 TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int kNumBlocks = 50;
 
   // Create one container.
@@ -1676,7 +1671,7 @@ TEST_P(LogBlockManagerTest, TestDetectMisalignedBlocks) {
 }
 
 TEST_P(LogBlockManagerTest, TestRepairPartialRecords) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const int kNumContainers = 50;
   const int kNumRecords = 10;
 
@@ -1718,7 +1713,7 @@ TEST_P(LogBlockManagerTest, TestRepairPartialRecords) {
 }
 
 TEST_P(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Force our single container to become full once created.
   FLAGS_log_container_max_size = 0;
 
@@ -1756,7 +1751,7 @@ TEST_P(LogBlockManagerTest, TestDeleteDeadContainersAtStartup) {
 }
 
 TEST_P(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // With this ratio, the metadata of a full container comprised of half dead
   // blocks will be compacted at startup.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.50;
@@ -1822,7 +1817,7 @@ TEST_P(LogBlockManagerTest, TestCompactFullContainerMetadataAtStartup) {
 // The bug was related to a stale file descriptor left in the file_cache, so
 // this test explicitly targets that scenario.
 TEST_P(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Compact aggressively.
   FLAGS_log_container_live_metadata_before_compact_ratio = 0.99;
   // Use a single shard so that we have an accurate max cache capacity
@@ -1886,7 +1881,7 @@ TEST_P(LogBlockManagerTest, TestDeleteFromContainerAfterMetadataCompaction) {
 // will run smoothly. The directory manager will note the failed directories
 // and only healthy ones are reported.
 TEST_P(LogBlockManagerTest, TestOpenWithFailedDirectories) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Initialize a new directory manager with multiple directories.
   bm_.reset();
   vector<string> test_dirs;
@@ -1936,7 +1931,7 @@ TEST_P(LogBlockManagerTest, TestOpenWithFailedDirectories) {
 // 2) the block cannot be opened/found until close it.
 // 3) the same container is not marked as available twice.
 TEST_P(LogBlockManagerTest, TestFinalizeBlock) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Create 4 blocks.
   vector<unique_ptr<WritableBlock>> blocks;
   for (int i = 0; i < 4; i++) {
@@ -1961,7 +1956,7 @@ TEST_P(LogBlockManagerTest, TestFinalizeBlock) {
 
 // Test available log container selection is LIFO.
 TEST_P(LogBlockManagerTest, TestLIFOContainerSelection) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Create 4 blocks and 4 opened containers that are not full.
   vector<unique_ptr<WritableBlock>> blocks;
   for (int i = 0; i < 4; i++) {
@@ -2014,7 +2009,7 @@ TEST_P(LogBlockManagerTest, TestAbortBlock) {
 }
 
 TEST_P(LogBlockManagerTest, TestDeleteDeadContainersByDeletionTransaction) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   const auto TestProcess = [&] (int block_num) {
     ASSERT_GT(block_num, 0);
     MetricRegistry registry;
@@ -2134,7 +2129,7 @@ TEST_P(LogBlockManagerTest, TestDeleteDeadContainersByDeletionTransaction) {
 // Test for KUDU-2665 to ensure that once the container is full and has no live
 // blocks but with a reference by WritableBlock, it will not be deleted.
 TEST_P(LogBlockManagerTest, TestDoNotDeleteFakeDeadContainer) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   // Lower the max container size.
   FLAGS_log_container_max_size = 64 * 1024;
 
@@ -2198,7 +2193,7 @@ TEST_P(LogBlockManagerTest, TestDoNotDeleteFakeDeadContainer) {
 }
 
 TEST_P(LogBlockManagerTest, TestHalfPresentContainer) {
-  EnableEncryption(GetParam());
+  SetEncryptionFlags(GetParam());
   BlockId block_id;
   string data_file_name;
   string metadata_file_name;
diff --git a/src/kudu/integration-tests/log_verifier.cc b/src/kudu/integration-tests/log_verifier.cc
index df21938df..a7b920826 100644
--- a/src/kudu/integration-tests/log_verifier.cc
+++ b/src/kudu/integration-tests/log_verifier.cc
@@ -75,7 +75,7 @@ Status LogVerifier::ScanForCommittedOpIds(int ts_idx, const string& tablet_id,
 
   shared_ptr<LogReader> reader;
   const string wal_dir = JoinPathSegments(inspector_->WalDirForTS(ts_idx), tablet_id);
-  RETURN_NOT_OK(LogReader::Open(env_,
+  RETURN_NOT_OK(LogReader::Open(cluster_->ts_env(ts_idx),
                                 wal_dir,
                                 /*index*/nullptr,
                                 tablet_id,
diff --git a/src/kudu/integration-tests/mini_cluster_fs_inspector.cc b/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
index 37b2eb791..863095463 100644
--- a/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
+++ b/src/kudu/integration-tests/mini_cluster_fs_inspector.cc
@@ -165,7 +165,8 @@ Status MiniClusterFsInspector::ReadTabletSuperBlockOnTS(int ts_idx,
                                                         const string& tablet_id,
                                                         TabletSuperBlockPB* sb) {
   const auto& sb_path = GetTabletSuperBlockPathOnTS(ts_idx, tablet_id);
-  return pb_util::ReadPBContainerFromPath(env_, sb_path, sb, pb_util::SENSITIVE);
+  return pb_util::ReadPBContainerFromPath(cluster_->ts_env(ts_idx), sb_path, sb,
+                                          pb_util::SENSITIVE);
 }
 
 int64_t MiniClusterFsInspector::GetTabletSuperBlockMTimeOrDie(int ts_idx,
@@ -190,7 +191,8 @@ Status MiniClusterFsInspector::ReadConsensusMetadataOnTS(int ts_idx,
   if (!env_->FileExists(cmeta_path)) {
     return Status::NotFound("Consensus metadata file not found", cmeta_path);
   }
-  return pb_util::ReadPBContainerFromPath(env_, cmeta_path, cmeta_pb, pb_util::SENSITIVE);
+  return pb_util::ReadPBContainerFromPath(cluster_->ts_env(ts_idx), cmeta_path, cmeta_pb,
+                                          pb_util::SENSITIVE);
 }
 
 Status MiniClusterFsInspector::WriteConsensusMetadataOnTS(
@@ -198,7 +200,7 @@ Status MiniClusterFsInspector::WriteConsensusMetadataOnTS(
     const string& tablet_id,
     const ConsensusMetadataPB& cmeta_pb) {
   auto cmeta_path = GetConsensusMetadataPathOnTS(ts_idx, tablet_id);
-  return pb_util::WritePBContainerToPath(env_, cmeta_path, cmeta_pb,
+  return pb_util::WritePBContainerToPath(cluster_->ts_env(ts_idx), cmeta_path, cmeta_pb,
                                          pb_util::OVERWRITE, pb_util::NO_SYNC,
                                          pb_util::SENSITIVE);
 }
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 34bcaf1d1..795d5c091 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -826,8 +826,9 @@ TEST_P(RaftConsensusParamEncryptionITest, TestCatchupAfterOpsEvicted) {
   if (GetParam()) {
     // We need to enable encryption both in the mini-cluster and in the current
     // process, as both of them access encrypted files.
+    SetEncryptionFlags(true);
     kTsFlags.emplace_back("--encrypt_data_at_rest=true");
-    FLAGS_encrypt_data_at_rest = true;
+    kTsFlags.emplace_back("--test_server_key=" + GetEncryptionKey());
   }
 
   NO_FATALS(BuildAndStart(kTsFlags));
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index 5e06eec6b..614159259 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -984,6 +984,45 @@ TEST_F(TabletCopyITest, TestSlowCopyDoesntFail) {
                             workload.rows_inserted()));
 }
 
+TEST_F(TabletCopyITest, TestTabletCopyEncryptedServers) {
+  SetEncryptionFlags(true);
+  ExternalMiniClusterOptions opts;
+  opts.num_tablet_servers = 3;
+  NO_FATALS(StartClusterWithOpts(opts));
+
+  MonoDelta timeout = MonoDelta::FromSeconds(30);
+
+  TestWorkload workload(cluster_.get());
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 1000) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ExternalTabletServer* replica_ets = cluster_->tablet_server(2);
+  TServerDetails* replica_ts = ts_map_[replica_ets->uuid()];
+  ASSERT_OK(WaitForNumTabletsOnTS(replica_ts, 1, timeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Tombstone the follower.
+  LOG(INFO) << "Tombstoning follower tablet " << tablet_id << " on TS " << replica_ts->uuid();
+  ASSERT_OK(DeleteTablet(replica_ts, tablet_id, TABLET_DATA_TOMBSTONED, timeout));
+
+  // Wait for tablet copy to start.
+  ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(2, tablet_id,
+                                                 { tablet::TABLET_DATA_COPYING }, timeout));
+
+  workload.StopAndJoin();
+  ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
+
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
+                            workload.rows_inserted()));
+}
+
 // Attempting to start Tablet Copy on a tablet that was deleted with
 // TABLET_DATA_DELETED should fail. This behavior helps avoid thrashing when
 // a follower tablet is deleted and the leader notices before it has processed
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index 975880550..d2895210a 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -34,6 +34,7 @@
 
 #include "kudu/client/client.h"
 #include "kudu/client/master_rpc.h"
+#include "kudu/fs/fs.pb.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #if !defined(NO_CHRONY)
 #include "kudu/clock/test/mini_chronyd.h"
@@ -41,6 +42,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/stringpiece.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -97,6 +99,7 @@ using std::string;
 using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
+using strings::a2b_hex;
 using strings::Substitute;
 
 typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
@@ -153,6 +156,14 @@ Env* ExternalMiniCluster::env() const {
   return Env::Default();
 }
 
+Env* ExternalMiniCluster::ts_env(int ts_idx) const {
+  return tablet_server(ts_idx)->env();
+}
+
+Env* ExternalMiniCluster::master_env(int master_idx) const {
+  return master(master_idx)->env();
+}
+
 Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) {
   string exe;
   RETURN_NOT_OK(env()->GetExecutablePath(&exe));
@@ -537,6 +548,7 @@ Status ExternalMiniCluster::StartMasters() {
     scoped_refptr<ExternalMaster> peer;
     RETURN_NOT_OK(CreateMaster(master_rpc_addrs, i, &peer));
     RETURN_NOT_OK_PREPEND(peer->Start(), Substitute("Unable to start Master at index $0", i));
+    RETURN_NOT_OK(peer->SetServerKey());
     masters_.emplace_back(std::move(peer));
   }
   return Status::OK();
@@ -599,6 +611,7 @@ Status ExternalMiniCluster::AddTabletServer() {
   }
 
   RETURN_NOT_OK(ts->Start());
+  RETURN_NOT_OK(ts->SetServerKey());
   tablet_servers_.push_back(ts);
   return Status::OK();
 }
@@ -1256,6 +1269,23 @@ Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
   return Status::OK();
 }
 
+Env* ExternalDaemon::env() const {
+  return Env::Default();
+}
+
+Status ExternalDaemon::SetServerKey() {
+  string path = JoinPathSegments(this->wal_dir(), "instance");;
+  LOG(INFO) << "Reading " << path;
+  InstanceMetadataPB instance;
+  RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(env(), path, &instance, pb_util::NOT_SENSITIVE));
+  if (string key = instance.server_key();
+      !key.empty()) {
+    LOG(INFO) << "Setting key " << key;
+    env()->SetEncryptionKey(key.size() * 4, reinterpret_cast<const uint8_t*>(a2b_hex(key).c_str()));
+  }
+  return Status::OK();
+}
+
 void ExternalDaemon::SetExePath(string exe) {
   CHECK(IsShutdown()) << "Call Shutdown() before changing the executable path";
   opts_.exe = std::move(exe);
@@ -1527,7 +1557,8 @@ ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() {
 //------------------------------------------------------------
 
 ExternalMaster::ExternalMaster(ExternalDaemonOptions opts)
-    : ExternalDaemon(std::move(opts)) {
+    : ExternalDaemon(std::move(opts)),
+      env_(Env::NewEnv()) {
 }
 
 ExternalMaster::~ExternalMaster() {
@@ -1652,7 +1683,8 @@ vector<string> ExternalMaster::GetMasterFlags(const ExternalDaemonOptions& opts)
 ExternalTabletServer::ExternalTabletServer(ExternalDaemonOptions opts,
                                            vector<HostPort> master_addrs)
     : ExternalDaemon(std::move(opts)),
-      master_addrs_(std::move(master_addrs)) {
+      master_addrs_(std::move(master_addrs)),
+      env_(Env::NewEnv()) {
   DCHECK(!master_addrs_.empty());
 }
 
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index 44dd5d59b..1c005371e 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -402,9 +402,17 @@ class ExternalMiniCluster : public MiniCluster {
   // Returns the UUID for the tablet server 'ts_idx'.
   virtual std::string UuidForTS(int ts_idx) const override;
 
-  // Returns the Env on which the cluster operates.
+  // Returns the Env on which the cluster operates. If encryption is enabled,
+  // the encryption key is incorrect. For reading/writing files, ts_env() and
+  // master_env() should be used instead.
   virtual Env* env() const override;
 
+  // Returns the Env on which a specific tablet server operates.
+  virtual Env* ts_env(int ts_idx) const override;
+
+  // Returns the Env on which a specific master operates.
+  virtual Env* master_env(int master_idx) const override;
+
   BindMode bind_mode() const override {
     return opts_.bind_mode;
   }
@@ -623,6 +631,8 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
                         const std::string& principal_base,
                         const std::string& bind_host);
 
+  Status SetServerKey();
+
   // Sends a SIGSTOP signal to the daemon.
   Status Pause() WARN_UNUSED_RESULT;
 
@@ -676,6 +686,8 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
   // Return the options used to create the daemon.
   ExternalDaemonOptions opts() const { return opts_; }
 
+  virtual Env* env() const;
+
   void SetRpcBindAddress(HostPort rpc_hostport) {
     DCHECK(!IsProcessAlive());
     bound_rpc_ = std::move(rpc_hostport);
@@ -767,6 +779,8 @@ class ExternalMaster : public ExternalDaemon {
   // Requires that it has previously been shutdown.
   virtual Status Restart() override WARN_UNUSED_RESULT;
 
+  Env* env() const override { return env_.get(); }
+
   // Blocks until the master's catalog manager is initialized and responding to
   // RPCs. If 'wait_mode' is WAIT_FOR_LEADERSHIP, will further block until the
   // master has been elected leader.
@@ -791,6 +805,7 @@ class ExternalMaster : public ExternalDaemon {
   // addresses in case of restart.
   static std::vector<std::string> GetCommonFlags(const HostPort& rpc_bind_addr,
                                                  const HostPort& http_addr = HostPort());
+  const std::unique_ptr<Env> env_;
   virtual ~ExternalMaster();
 };
 
@@ -805,8 +820,10 @@ class ExternalTabletServer : public ExternalDaemon {
   // Requires that it has previously been shutdown.
   virtual Status Restart() override WARN_UNUSED_RESULT;
 
+  Env* env() const override { return env_.get(); }
  private:
   const std::vector<HostPort> master_addrs_;
+  const std::unique_ptr<Env> env_;
 
   friend class RefCountedThreadSafe<ExternalTabletServer>;
   virtual ~ExternalTabletServer();
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc b/src/kudu/mini-cluster/internal_mini_cluster.cc
index d740fbfb5..7bab85cba 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.cc
+++ b/src/kudu/mini-cluster/internal_mini_cluster.cc
@@ -31,6 +31,7 @@
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.proxy.h"
+#include "kudu/master/master_options.h"
 #include "kudu/master/mini_master.h"
 #include "kudu/master/ts_descriptor.h"
 #include "kudu/master/ts_manager.h"
@@ -155,6 +156,7 @@ Status InternalMiniCluster::StartMasters() {
     for (int i = 0; i < num_masters; i++) {
       auto mini_master(std::make_shared<MiniMaster>(
           GetMasterFsRoot(i), master_rpc_addrs[i]));
+      mini_master->mutable_options()->server_key = KuduTest::GetEncryptionKey();
       if (num_masters > 1 || opts_.supply_single_master_addr) {
         mini_master->SetMasterAddresses(master_rpc_addrs);
       }
@@ -212,6 +214,8 @@ Status InternalMiniCluster::AddTabletServer(const HostPort& hp) {
   unique_ptr<MiniTabletServer> tablet_server(
       new MiniTabletServer(GetTabletServerFsRoot(new_idx), hp, opts_.num_data_dirs));
   tablet_server->options()->master_addresses = master_rpc_addrs();
+  tablet_server->options()->server_key = KuduTest::GetEncryptionKey();
+
   RETURN_NOT_OK(tablet_server->Start());
   mini_tablet_servers_.emplace_back(std::move(tablet_server));
   return Status::OK();
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h b/src/kudu/mini-cluster/internal_mini_cluster.h
index e7b4e3913..185bb91ce 100644
--- a/src/kudu/mini-cluster/internal_mini_cluster.h
+++ b/src/kudu/mini-cluster/internal_mini_cluster.h
@@ -164,6 +164,20 @@ class InternalMiniCluster : public MiniCluster {
     return env_;
   }
 
+  // Returns the default environment. As the servers in an internal mini-cluster
+  // share the same Env, each tablet server uses the same server key, so the
+  // default Env can be used here.
+  Env* ts_env(int ts_idx) const override {
+    return env_;
+  }
+
+  // Returns the default environment. As the servers in an internal mini-cluster
+  // share the same Env, each master uses the same server key, so the default
+  // Env can be used here.
+  Env* master_env(int master_idx) const override {
+    return env_;
+  }
+
   BindMode bind_mode() const override {
     return opts_.bind_mode;
   }
diff --git a/src/kudu/mini-cluster/mini_cluster.h b/src/kudu/mini-cluster/mini_cluster.h
index dd8e96415..232c9ed62 100644
--- a/src/kudu/mini-cluster/mini_cluster.h
+++ b/src/kudu/mini-cluster/mini_cluster.h
@@ -134,6 +134,10 @@ class MiniCluster {
   // Returns the Env on which the cluster operates.
   virtual Env* env() const = 0;
 
+  virtual Env* ts_env(int ts_idx) const = 0;
+
+  virtual Env* master_env(int master_idx) const = 0;
+
   /// Reserves a unique socket address for a mini-cluster daemon. The address
   /// can be ascertained through the returned socket, and will remain reserved
   /// for the life of the socket. The daemon must use the SO_REUSEPORT socket
diff --git a/src/kudu/server/server_base.cc b/src/kudu/server/server_base.cc
index 6123f7748..8e412418b 100644
--- a/src/kudu/server/server_base.cc
+++ b/src/kudu/server/server_base.cc
@@ -611,7 +611,11 @@ Status ServerBase::Init() {
   if (s.IsNotFound()) {
     LOG(INFO) << "This appears to be a new deployment of Kudu; creating new FS layout";
     is_first_run_ = true;
-    s = fs_manager_->CreateInitialFileSystemLayout();
+    if (options_.server_key.empty()) {
+      s = fs_manager_->CreateInitialFileSystemLayout();
+    } else {
+      s = fs_manager_->CreateInitialFileSystemLayout(boost::none, options_.server_key);
+    }
     if (s.IsAlreadyPresent()) {
       return s.CloneAndPrepend("FS layout already exists; not overwriting existing layout");
     }
diff --git a/src/kudu/server/server_base_options.cc b/src/kudu/server/server_base_options.cc
index d3e8b6b2a..47b9c08fc 100644
--- a/src/kudu/server/server_base_options.cc
+++ b/src/kudu/server/server_base_options.cc
@@ -20,8 +20,8 @@
 #include <gflags/gflags.h>
 
 #include "kudu/gutil/macros.h"
-#include "kudu/util/flag_tags.h"
 #include "kudu/util/env.h"
+#include "kudu/util/flag_tags.h"
 
 DEFINE_string(server_dump_info_path, "",
               "Path into which the server information will be "
@@ -41,6 +41,12 @@ DEFINE_int32(metrics_log_interval_ms, 60000,
              "value, then metrics logging will be disabled.");
 TAG_FLAG(metrics_log_interval_ms, advanced);
 
+DEFINE_string(test_server_key, "",
+              "Server key in plain-text to be persisted into the instance file. "
+              "It is only used when creating the file system, it's disregarded on "
+              "consecutive startups. It should only be used in tests.");
+TAG_FLAG(test_server_key, hidden);
+
 namespace kudu {
 namespace server {
 
@@ -48,7 +54,8 @@ ServerBaseOptions::ServerBaseOptions()
   : env(Env::Default()),
     dump_info_path(FLAGS_server_dump_info_path),
     dump_info_format(FLAGS_server_dump_info_format),
-    metrics_log_interval_ms(FLAGS_metrics_log_interval_ms) {
+    metrics_log_interval_ms(FLAGS_metrics_log_interval_ms),
+    server_key(FLAGS_test_server_key) {
 }
 
 } // namespace server
diff --git a/src/kudu/server/server_base_options.h b/src/kudu/server/server_base_options.h
index 5b1318046..c972d6db2 100644
--- a/src/kudu/server/server_base_options.h
+++ b/src/kudu/server/server_base_options.h
@@ -45,6 +45,8 @@ struct ServerBaseOptions {
 
   int32_t metrics_log_interval_ms;
 
+  std::string server_key;
+
  protected:
   ServerBaseOptions();
 };
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 9fabb315b..ae6bd0a10 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -140,6 +140,7 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/url-coding.h"
 
+DECLARE_bool(encrypt_data_at_rest);
 DECLARE_bool(fs_data_dirs_consider_available_space);
 DECLARE_bool(hive_metastore_sasl_enabled);
 DECLARE_bool(show_values);
@@ -1589,6 +1590,14 @@ TEST_F(ToolTest, TestFsDumpUuid) {
 }
 
 TEST_F(ToolTest, TestPbcTools) {
+  // It's pointless to run these tests in an encrypted environment, as it uses
+  // instance files to test pbc tools, which are not encrypted anyway. The tests
+  // also make assumptions about the contents of the instance files, which are
+  // different on encrypted servers, as they contain an extra server_key field,
+  // which would make these tests break.
+  if (FLAGS_encrypt_data_at_rest) {
+    GTEST_SKIP();
+  }
   const string kTestDir = GetTestPath("test");
   string uuid;
   string instance_path;
@@ -1782,6 +1791,7 @@ TEST_F(ToolTest, TestPbcToolsOnMultipleBlocks) {
 
   // Generate a block container metadata file.
   string metadata_path;
+  string encryption_args;
   {
     // Open FsManager to write file later.
     FsManager fs(env_, FsManagerOpts(kTestDir));
@@ -1814,9 +1824,12 @@ TEST_F(ToolTest, TestPbcToolsOnMultipleBlocks) {
     }
     ASSERT_EQ(1, metadata_files.size());
     metadata_path = metadata_files[0];
-  }
 
-  string encryption_args = env_->IsEncryptionEnabled() ? GetEncryptionArgs() : "";
+    if (env_->IsEncryptionEnabled()) {
+      encryption_args = GetEncryptionArgs() + " --instance_file=" +
+          fs.GetInstanceMetadataPath(kTestDir);
+      }
+  }
 
   // Test default dump
   {
@@ -2133,7 +2146,11 @@ TEST_F(ToolTest, TestWalDump) {
   }
 
   string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1);
-  string encryption_args = env_->IsEncryptionEnabled() ? GetEncryptionArgs() : "";
+  string encryption_args;
+  if (env_->IsEncryptionEnabled()) {
+    encryption_args = GetEncryptionArgs() + " --instance_file=" +
+        fs.GetInstanceMetadataPath(kTestDir);
+  }
   string stdout;
   for (const auto& args : { Substitute("wal dump $0 $1", wal_path, encryption_args),
                             Substitute("local_replica dump wals --fs_wal_dir=$0 $1 $2",
@@ -2314,9 +2331,14 @@ TEST_F(ToolTest, TestWalDumpWithAlterSchema) {
 
   string wal_path = fs.GetWalSegmentFileName(kTestTablet, 1);
   string stdout;
-  for (const auto& args : { Substitute("wal dump $0", wal_path),
-                            Substitute("local_replica dump wals --fs_wal_dir=$0 $1",
-                                       kTestDir, kTestTablet)
+  string encryption_args;
+  if (env_->IsEncryptionEnabled()) {
+    encryption_args = GetEncryptionArgs() + " --instance_file=" +
+        fs.GetInstanceMetadataPath(kTestDir);
+  }
+  for (const auto& args : { Substitute("wal dump $0 $1", encryption_args, wal_path),
+                            Substitute("local_replica dump wals --fs_wal_dir=$0 $1 $2",
+                                       kTestDir, encryption_args, kTestTablet)
                            }) {
     SCOPED_TRACE(args);
     for (const auto& print_entries : { "true", "1", "yes", "decoded" }) {
@@ -4046,7 +4068,13 @@ TEST_F(ToolTest, TestLocalReplicaCMetaOps) {
   for (int i = 0; i < kNumTabletServers; ++i) {
     ts_uuids.emplace_back(mini_cluster_->mini_tablet_server(i)->uuid());
   }
-  string encryption_args = env_->IsEncryptionEnabled() ? GetEncryptionArgs() : "";
+  string encryption_args;
+
+  if (env_->IsEncryptionEnabled()) {
+    encryption_args = GetEncryptionArgs() + " --instance_file=" +
+        JoinPathSegments(mini_cluster_->mini_tablet_server(0)->options()->fs_opts.wal_root,
+                         "instance");
+  }
   const string& flags =
       Substitute("-fs-wal-dir $0 $1",
                  mini_cluster_->mini_tablet_server(0)->options()->fs_opts.wal_root,
@@ -8004,7 +8032,10 @@ TEST_F(ToolTest, TestLocalReplicaCopyLocal) {
 }
 
 TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) {
-  SKIP_IF_SLOW_NOT_ALLOWED();
+  // Local copies are not supported on encrypted severs at this time.
+  if (FLAGS_encrypt_data_at_rest) {
+    GTEST_SKIP();
+  }
   // Create replicas and fill some data.
   const int kNumTserver = 3;
   InternalMiniClusterOptions opts;
@@ -8061,13 +8092,11 @@ TEST_F(ToolTest, TestRebuildTserverByLocalReplicaCopy) {
   NO_FATALS(StopMiniCluster());
 
   // Copy source tserver's all replicas from local filesystem.
-  string encryption_args = env_->IsEncryptionEnabled() ? GetEncryptionArgs() : "";
   string stdout;
-  NO_FATALS(RunActionStdoutString(Substitute("local_replica copy_from_local $0 $1 $2 $3",
+  NO_FATALS(RunActionStdoutString(Substitute("local_replica copy_from_local $0 $1 $2",
                                              JoinStrings(tablet_ids, ","),
                                              src_fs_paths_with_prefix,
-                                             dst_fs_paths_with_prefix,
-                                             encryption_args),
+                                             dst_fs_paths_with_prefix),
                                   &stdout));
   SCOPED_TRACE(stdout);
 
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index 936af96e1..8dc52d002 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -53,8 +53,10 @@
 #include "kudu/consensus/log.pb.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/opid.pb.h"
+#include "kudu/fs/fs.pb.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
@@ -160,6 +162,9 @@ DEFINE_bool(row_count_only, false,
 
 DECLARE_bool(show_values);
 
+DEFINE_string(instance_file, "",
+              "Path to the instance file containing the encrypted encryption key.");
+
 bool ValidateTimeoutSettings() {
   if (FLAGS_timeout_ms < FLAGS_negotiation_timeout_ms) {
     LOG(ERROR) << strings::Substitute(
@@ -236,6 +241,7 @@ using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::vector;
+using strings::a2b_hex;
 using strings::Split;
 using strings::Substitute;
 
@@ -893,6 +899,25 @@ Status GetKuduToolAbsolutePathSafe(string* path) {
   return Status::OK();
 }
 
+Status SetServerKey() {
+  if (FLAGS_instance_file.empty()) {
+    return Status::OK();
+  }
+
+  InstanceMetadataPB instance;
+  RETURN_NOT_OK_PREPEND(pb_util::ReadPBContainerFromPath(Env::Default(), FLAGS_instance_file,
+                                                         &instance, pb_util::NOT_SENSITIVE),
+                        "Could not open instance file");
+
+  if (string key = instance.server_key();
+      !key.empty()) {
+    Env::Default()->SetEncryptionKey(key.length() * 4,
+                                     reinterpret_cast<const uint8_t*>(a2b_hex(key).c_str()));
+  }
+
+  return Status::OK();
+}
+
 namespace {
 
 // Pretty print a table using the psql format. For example:
diff --git a/src/kudu/tools/tool_action_common.h b/src/kudu/tools/tool_action_common.h
index d89b55cf9..ada659b92 100644
--- a/src/kudu/tools/tool_action_common.h
+++ b/src/kudu/tools/tool_action_common.h
@@ -241,6 +241,10 @@ Status MasterAddressesToSet(
 // as reported in ConnectToMasterResponsePB::master_addrs.
 Status VerifyMasterAddressList(const std::vector<std::string>& master_addresses);
 
+// Parses the instance file set by the 'instance_file' flag, and sets the
+// server key on the default Env.
+Status SetServerKey();
+
 // A table of data to present to the user.
 //
 // Supports formatting based on the --format flag.
diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc
index ea530f0b3..d7a387424 100644
--- a/src/kudu/tools/tool_action_fs.cc
+++ b/src/kudu/tools/tool_action_fs.cc
@@ -84,6 +84,8 @@ DEFINE_bool(print_rows, true,
 DEFINE_string(uuid, "",
               "The uuid to use in the filesystem. "
               "If not provided, one is generated");
+DEFINE_string(server_key, "",
+              "The encrypted server key to use in the filesystem.");
 DEFINE_bool(repair, false,
             "Repair any inconsistencies in the filesystem.");
 
@@ -224,10 +226,14 @@ Status Check(const RunnerContext& /*context*/) {
 Status Format(const RunnerContext& /*context*/) {
   FsManager fs_manager(Env::Default(), FsManagerOpts());
   boost::optional<string> uuid;
+  boost::optional<string> server_key;
   if (!FLAGS_uuid.empty()) {
     uuid = FLAGS_uuid;
   }
-  return fs_manager.CreateInitialFileSystemLayout(uuid);
+  if (!FLAGS_server_key.empty()) {
+    server_key = FLAGS_server_key;
+  }
+  return fs_manager.CreateInitialFileSystemLayout(uuid, server_key);
 }
 
 Status DumpUuid(const RunnerContext& /*context*/) {
diff --git a/src/kudu/tools/tool_action_pbc.cc b/src/kudu/tools/tool_action_pbc.cc
index 4c3e51cd7..37ce9f37a 100644
--- a/src/kudu/tools/tool_action_pbc.cc
+++ b/src/kudu/tools/tool_action_pbc.cc
@@ -46,6 +46,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/tools/tool_action.h"
+#include "kudu/tools/tool_action_common.h"
 #include "kudu/util/env.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/flag_validators.h"
@@ -126,6 +127,7 @@ Status DumpPBContainerFile(const RunnerContext& context) {
     format = ReadablePBContainerFile::Format::DEBUG;
   }
 
+  RETURN_NOT_OK(SetServerKey());
   Env* env = Env::Default();
   unique_ptr<RandomAccessFile> reader;
   RandomAccessFileOptions opts;
@@ -158,6 +160,7 @@ Status RunEditor(const string& path) {
 }
 
 Status EditFile(const RunnerContext& context) {
+  RETURN_NOT_OK(SetServerKey());
   Env* env = Env::Default();
   const string& path = FindOrDie(context.required_args, kPathArg);
   const string& dir = DirName(path);
diff --git a/src/kudu/tools/tool_action_wal.cc b/src/kudu/tools/tool_action_wal.cc
index 466ec7940..11128ce42 100644
--- a/src/kudu/tools/tool_action_wal.cc
+++ b/src/kudu/tools/tool_action_wal.cc
@@ -41,6 +41,7 @@ namespace {
 const char* const kPathArg = "path";
 
 Status Dump(const RunnerContext& context) {
+  RETURN_NOT_OK(SetServerKey());
   const string& segment_path = FindOrDie(context.required_args, kPathArg);
 
   scoped_refptr<ReadableLogSegment> segment;
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index 96427dc07..dac3c031d 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -16,9 +16,9 @@
 // under the License.
 #include "kudu/tserver/tablet_copy_client.h"
 
-#include <cstring>
 #include <cstdint>
 #include <cstdlib>
+#include <cstring>
 #include <limits>
 #include <memory>
 #include <ostream>
@@ -27,6 +27,7 @@
 #include <tuple>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
@@ -122,7 +123,12 @@ class TabletCopyClientTest : public TabletCopyTest {
     metric_entity_ = METRIC_ENTITY_server.Instantiate(&metric_registry_, "test");
     opts.metric_entity = metric_entity_;
     fs_manager_.reset(new FsManager(Env::Default(), opts));
-    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
+    string server_key = KuduTest::GetEncryptionKey();
+    if (server_key.empty()) {
+      ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
+    } else {
+      ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout(boost::none, server_key));
+    }
     ASSERT_OK(fs_manager_->Open());
     ASSERT_OK(ResetTabletCopyClient());
   }
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 2edfbfc8f..14406f62a 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -500,7 +500,8 @@ Status TabletCopyClient::Finish() {
     string meta_copy_path = Substitute("$0.copy.$1$2", meta_path, start_time_micros_, kTmpInfix);
     WritableFileOptions opts;
     opts.is_sensitive = true;
-    RETURN_NOT_OK_PREPEND(env_util::CopyFile(Env::Default(), meta_path, meta_copy_path, opts),
+    RETURN_NOT_OK_PREPEND(env_util::CopyFile(dst_fs_manager_->env(), meta_path,
+                                             meta_copy_path, opts),
                           "Unable to make copy of tablet metadata");
   }
 
@@ -808,7 +809,8 @@ Status TabletCopyClient::WriteConsensusMetadata() {
     string cmeta_copy_path = Substitute("$0.copy.$1$2", cmeta_path, start_time_micros_, kTmpInfix);
     WritableFileOptions opts;
     opts.is_sensitive = true;
-    RETURN_NOT_OK_PREPEND(env_util::CopyFile(Env::Default(), cmeta_path, cmeta_copy_path, opts),
+    RETURN_NOT_OK_PREPEND(env_util::CopyFile(dst_fs_manager_->env(), cmeta_path,
+                                             cmeta_copy_path, opts),
                           "Unable to make copy of consensus metadata");
   }
 
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index dd2df297a..38787b30d 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -65,6 +65,7 @@
 DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_int32(heartbeat_rpc_timeout_ms);
+DECLARE_string(test_server_key);
 
 METRIC_DEFINE_entity(test);
 
@@ -95,6 +96,8 @@ TabletServerTestBase::TabletServerTestBase()
   // the heartbeat timeout to 1 second speeds up unit tests which
   // purposefully specify non-running Master servers.
   FLAGS_heartbeat_rpc_timeout_ms = 1000;
+
+  FLAGS_test_server_key = GetEncryptionKey();
 }
 
 // Starts the tablet server, override to start it later.
diff --git a/src/kudu/util/env-test.cc b/src/kudu/util/env-test.cc
index cec87dc9f..c5fad1d06 100644
--- a/src/kudu/util/env-test.cc
+++ b/src/kudu/util/env-test.cc
@@ -80,6 +80,7 @@ DECLARE_int32(env_inject_short_read_bytes);
 DECLARE_int32(env_inject_short_write_bytes);
 DECLARE_int32(encryption_key_length);
 DECLARE_string(env_inject_eio_globs);
+DECLARE_string(encryption_server_key);
 
 namespace kudu {
 
@@ -1254,7 +1255,7 @@ TEST_F(TestEnv, TestCreateFifo) {
 class TestEncryptedEnv : public TestEnv, public ::testing::WithParamInterface<int> {
  public:
   void SetUp() override {
-    FLAGS_encrypt_data_at_rest = true;
+    SetEncryptionFlags(true);
     FLAGS_encryption_key_length = GetParam();
   }
 };
@@ -1262,7 +1263,6 @@ class TestEncryptedEnv : public TestEnv, public ::testing::WithParamInterface<in
 INSTANTIATE_TEST_SUITE_P(TestEncryption, TestEncryptedEnv, ::testing::Values(128, 192, 256));
 
 TEST_P(TestEncryptedEnv, TestEncryption) {
-  FLAGS_encrypt_data_at_rest = true;
   const string kFile = JoinPathSegments(test_dir_, "encrypted_file");
   unique_ptr<RWFile> rw;
   RWFileOptions opts;
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index aa95b0b8d..6fc3ee894 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -76,6 +76,11 @@ class Env {
   // The result of Default() belongs to kudu and must never be deleted.
   static Env* Default();
 
+  // Return a new Env instance of the same type as the default
+  // environment.  Unlike the default env, this is not owned by Kudu, and
+  // must be destroyed when not used anymore.
+  static std::unique_ptr<Env> NewEnv();
+
   // Create a brand new sequentially-readable file with the specified name.
   // On success, stores a pointer to the new file in *result and returns OK.
   // On failure stores NULL in *result and returns non-OK.  If the file does
@@ -385,6 +390,8 @@ class Env {
 
   virtual bool IsEncryptionEnabled() const = 0;
 
+  virtual void SetEncryptionKey(int key_size, const uint8_t* key) = 0;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(Env);
 };
@@ -416,6 +423,10 @@ class Fifo : public File {
   // opened for reads before calling.
   virtual int read_fd() const = 0;
 
+  // Initializes the default environment with encryption enabled using the
+  // given AES key.
+  static Status InitializeEncryptedEnv(int key_size, uint8_t* server_key);
+
   // Returns the write fd, set when opened for writes. The fifo must have been
   // opened for writes before calling.
   virtual int write_fd() const = 0;
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 322638985..a119ffe17 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -32,6 +32,7 @@
 #include <map>
 #include <memory>
 #include <numeric>
+#include <optional>
 #include <ostream>
 #include <string>
 #include <type_traits>
@@ -761,7 +762,8 @@ Status GenerateHeader(EncryptionHeader* eh) {
   return Status::OK();
 }
 
-Status WriteEncryptionHeader(int fd, const string& filename, const EncryptionHeader& eh) {
+Status WriteEncryptionHeader(int fd, const string& filename, const EncryptionHeader& server_key,
+                             const EncryptionHeader& eh) {
   vector<Slice> headerv = { kEncryptionHeaderMagic };
   uint32_t key_size;
   uint8_t algorithm[1];
@@ -790,7 +792,7 @@ Status WriteEncryptionHeader(int fd, const string& filename, const EncryptionHea
   Slice efk(encrypted_file_key, key_size);
   vector<Slice> clear = {file_key};
   vector<Slice> cipher = {efk};
-  RETURN_NOT_OK(DoEncryptV(&kDummyEncryptionKey, 0, clear, cipher));
+  RETURN_NOT_OK(DoEncryptV(&server_key, 0, clear, cipher));
 
   // Add the encrypted file key and trailing zeros to the header.
   headerv.emplace_back(efk);
@@ -819,7 +821,8 @@ Status DoIsOnXfsFilesystem(const string& path, bool* result) {
   return Status::OK();
 }
 
-Status ReadEncryptionHeader(int fd, const string& filename, EncryptionHeader* eh) {
+Status ReadEncryptionHeader(int fd, const string& filename, const EncryptionHeader& server_key,
+                            EncryptionHeader* eh) {
   char magic[7];
   uint8_t algorithm[1];
   char file_key[32];
@@ -847,7 +850,7 @@ Status ReadEncryptionHeader(int fd, const string& filename, EncryptionHeader* eh
   // the file. The actual key size can be used when storing the key in memory.
   // See WriteEncryptionHeader for more info.
   vector<Slice> v = {Slice(file_key, (key_size + 15) & -16)};
-  RETURN_NOT_OK(DoDecryptV(&kDummyEncryptionKey, 0, v));
+  RETURN_NOT_OK(DoDecryptV(&server_key, 0, v));
   memcpy(&eh->key, file_key, key_size);
   return Status::OK();
 }
@@ -1509,11 +1512,15 @@ class PosixFileLock : public FileLock {
   int fd_;
 };
 
+static Env* default_env;
+
 class PosixEnv : public Env {
  public:
   ~PosixEnv() {
-    fprintf(stderr, "Destroying Env::Default()\n");
-    exit(1);
+    if (this == default_env) {
+      fprintf(stderr, "Destroying Env::Default()\n");
+      exit(1);
+    }
   }
 
   virtual Status NewSequentialFile(const string& fname,
@@ -1535,9 +1542,10 @@ class PosixEnv : public Env {
     bool encrypted = opts.is_sensitive && IsEncryptionEnabled();
     EncryptionHeader header;
     if (encrypted) {
+      DCHECK(server_key_);
       int fd;
       RETURN_NOT_OK(DoOpen(fname, OpenMode::MUST_EXIST, &fd));
-      RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, &header));
+      RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, *server_key_, &header));
       if (fseek(f, kEncryptionHeaderSize, SEEK_CUR)) {
         return IOError(fname, errno);
       }
@@ -1565,7 +1573,8 @@ class PosixEnv : public Env {
     EncryptionHeader header;
     bool encrypted = opts.is_sensitive && IsEncryptionEnabled();
     if (encrypted) {
-      RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, &header));
+      DCHECK(server_key_);
+      RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, *server_key_, &header));
     }
     result->reset(new PosixRandomAccessFile(fname, fd,
                   encrypted, header));
@@ -1620,11 +1629,12 @@ class PosixEnv : public Env {
     RETURN_NOT_OK(DoOpen(fname, opts.mode, &fd));
     EncryptionHeader eh;
     if (encrypt) {
+      DCHECK(server_key_);
       if (size >= kEncryptionHeaderSize) {
-        RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, &eh));
+        RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, *server_key_, &eh));
       } else {
         RETURN_NOT_OK(GenerateHeader(&eh));
-        RETURN_NOT_OK(WriteEncryptionHeader(fd, fname, eh));
+        RETURN_NOT_OK(WriteEncryptionHeader(fd, fname, *server_key_, eh));
       }
     }
     result->reset(new PosixRWFile(fname, fd, opts.sync_on_close,
@@ -1640,8 +1650,9 @@ class PosixEnv : public Env {
     bool encrypt = opts.is_sensitive && IsEncryptionEnabled();
     EncryptionHeader eh;
     if (encrypt) {
+      DCHECK(server_key_);
       RETURN_NOT_OK(GenerateHeader(&eh));
-      RETURN_NOT_OK(WriteEncryptionHeader(fd, *created_filename, eh));
+      RETURN_NOT_OK(WriteEncryptionHeader(fd, *created_filename, *server_key_, eh));
     }
     res->reset(new PosixRWFile(*created_filename, fd, opts.sync_on_close,
                                encrypt, eh));
@@ -2262,6 +2273,25 @@ class PosixEnv : public Env {
 
   bool IsEncryptionEnabled() const override { return FLAGS_encrypt_data_at_rest; }
 
+  void SetEncryptionKey(int key_size, const uint8_t* server_key) override {
+    EncryptionHeader eh;
+    switch (key_size) {
+      case 128:
+        eh.algorithm = EncryptionAlgorithm::AES128ECB;
+        break;
+      case 192:
+        eh.algorithm = EncryptionAlgorithm::AES192ECB;
+        break;
+      case 256:
+        eh.algorithm = EncryptionAlgorithm::AES256ECB;
+        break;
+      default:
+        LOG(FATAL) << "Illegal key size";
+    }
+    memcpy(eh.key, server_key, key_size / 8);
+    server_key_ = eh;
+  }
+
  private:
   // unique_ptr Deleter implementation for fts_close
   struct FtsCloser {
@@ -2310,12 +2340,13 @@ class PosixEnv : public Env {
     bool encrypt = opts.is_sensitive && IsEncryptionEnabled();
     EncryptionHeader eh;
     if (encrypt) {
+      DCHECK(server_key_);
       if (file_size < kEncryptionHeaderSize) {
         RETURN_NOT_OK(GenerateHeader(&eh));
-        RETURN_NOT_OK(WriteEncryptionHeader(fd, fname, eh));
+        RETURN_NOT_OK(WriteEncryptionHeader(fd, fname, *server_key_, eh));
         file_size = kEncryptionHeaderSize;
       } else {
-        RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, &eh));
+        RETURN_NOT_OK(ReadEncryptionHeader(fd, fname, *server_key_, &eh));
       }
     }
     result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close,
@@ -2366,12 +2397,13 @@ class PosixEnv : public Env {
     }
     return Status::OK();
   }
+
+  std::optional<EncryptionHeader> server_key_;
 };
 
 }  // namespace
 
 static pthread_once_t once = PTHREAD_ONCE_INIT;
-static Env* default_env;
 static void InitDefaultEnv() { default_env = new PosixEnv; }
 
 Env* Env::Default() {
@@ -2379,6 +2411,10 @@ Env* Env::Default() {
   return default_env;
 }
 
+unique_ptr<Env> Env::NewEnv() {
+  return unique_ptr<Env>(new PosixEnv());
+}
+
 std::ostream& operator<<(std::ostream& o, Env::ResourceLimitType t) {
   return o << ResourceLimitTypeToString(t);
 }
diff --git a/src/kudu/util/env_util.cc b/src/kudu/util/env_util.cc
index f481e0c86..532277f6f 100644
--- a/src/kudu/util/env_util.cc
+++ b/src/kudu/util/env_util.cc
@@ -240,18 +240,13 @@ Status CreateDirsRecursively(Env* env, const string& path) {
 Status CopyFile(Env* env, const string& source_path, const string& dest_path,
                 WritableFileOptions opts) {
   unique_ptr<SequentialFile> source;
-  // Both the source and the destination files are treated as insensitive,
-  // because if they're encrypted, it would be unnecessary to decrypt and
-  // re-encrypt it. This way, we make a byte for byte copy of the file
-  // regardless if it's encrypted.
   SequentialFileOptions source_opts;
-  source_opts.is_sensitive = false;
+  source_opts.is_sensitive = opts.is_sensitive;
   RETURN_NOT_OK(env->NewSequentialFile(source_opts, source_path, &source));
   uint64_t size;
   RETURN_NOT_OK(env->GetFileSize(source_path, &size));
 
   unique_ptr<WritableFile> dest;
-  opts.is_sensitive = false;
   RETURN_NOT_OK(env->NewWritableFile(opts, dest_path, &dest));
   RETURN_NOT_OK(dest->PreAllocate(size));
 
diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc
index b0af0301d..ccbcf17c0 100644
--- a/src/kudu/util/file_cache-test.cc
+++ b/src/kudu/util/file_cache-test.cc
@@ -53,9 +53,6 @@ using std::vector;
 using strings::Substitute;
 
 namespace {
-  void SetEncryptionFlags(bool encryption_enabled) {
-    FLAGS_encrypt_data_at_rest = encryption_enabled;
-  }
 } // namespace
 
 namespace kudu {
diff --git a/src/kudu/util/pb_util-test.cc b/src/kudu/util/pb_util-test.cc
index 7b88b7042..47dd2ca02 100644
--- a/src/kudu/util/pb_util-test.cc
+++ b/src/kudu/util/pb_util-test.cc
@@ -97,10 +97,6 @@ class TestPBUtil : public KuduTest {
   // Truncate the specified file to the specified length.
   Status TruncateFile(const string& path, uint64_t size);
 
-  void EnableEncryption(bool enable) {
-    FLAGS_encrypt_data_at_rest = enable;
-  }
-
   // Output file name for most unit tests.
   string path_;
 };
@@ -250,7 +246,7 @@ TEST_F(TestPBUtil, TestWritableFileOutputStream) {
 
 // Basic read/write test.
 TEST_F(TestPBUtil, TestPBContainerSimple) {
-  EnableEncryption(true);
+  SetEncryptionFlags(true);
   // Exercise both the SYNC and NO_SYNC codepaths, along with SENSITIVE and
   // NOT_SENSITIVE, despite the fact that we aren't able to observe a difference
   // in the test.
diff --git a/src/kudu/util/test_util.cc b/src/kudu/util/test_util.cc
index d6b12c792..2e7fad16c 100644
--- a/src/kudu/util/test_util.cc
+++ b/src/kudu/util/test_util.cc
@@ -39,6 +39,7 @@
 #include <glog/logging.h>
 #include <gtest/gtest-spi.h>
 
+#include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
@@ -82,6 +83,9 @@ const char* kInvalidPath = "/dev/invalid-path-for-kudu-tests";
 static const char* const kSlowTestsEnvVar = "KUDU_ALLOW_SLOW_TESTS";
 static const char* const kLargeKeysEnvVar = "KUDU_USE_LARGE_KEYS_IN_TESTS";
 static const char* const kEncryptDataInTests = "KUDU_ENCRYPT_DATA_IN_TESTS";
+static const int kEncryptionKeySize = 16;
+static const uint8_t kEncryptionKey[kEncryptionKeySize] =
+  {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 42};
 
 static const uint64_t kTestBeganAtMicros = Env::Default()->NowMicros();
 
@@ -140,7 +144,7 @@ KuduTest::KuduTest()
   }
 
   if (EnableEncryption()) {
-    FLAGS_encrypt_data_at_rest = true;
+    SetEncryptionFlags(true);
   }
 
   // If the TEST_TMPDIR variable has been set, then glog will automatically use that
@@ -194,6 +198,22 @@ void KuduTest::OverrideKrb5Environment() {
   setenv("KRB5CCNAME", kInvalidPath, 1);
 }
 
+void KuduTest::SetEncryptionFlags(bool enable_encryption) {
+  FLAGS_encrypt_data_at_rest = enable_encryption;
+  if (enable_encryption) {
+    Env::Default()->SetEncryptionKey(kEncryptionKeySize * 8, kEncryptionKey);
+  }
+}
+
+const string KuduTest::GetEncryptionKey() {
+  if (FLAGS_encrypt_data_at_rest) {
+    string key;
+    strings::b2a_hex(kEncryptionKey, &key, kEncryptionKeySize);
+    return key;
+  }
+  return "";
+}
+
 ///////////////////////////////////////////////////
 // Test utility functions
 ///////////////////////////////////////////////////
diff --git a/src/kudu/util/test_util.h b/src/kudu/util/test_util.h
index da972f4bb..5009b85a6 100644
--- a/src/kudu/util/test_util.h
+++ b/src/kudu/util/test_util.h
@@ -73,6 +73,9 @@ class KuduTest : public ::testing::Test {
   // variables so that we don't pick up the user's credentials.
   static void OverrideKrb5Environment();
 
+  // Returns the encryption key used by the test.
+  static const std::string GetEncryptionKey();
+
  protected:
   // Returns absolute path based on a unit test-specific work directory, given
   // a relative path. Useful for writing test files that should be deleted after
@@ -85,6 +88,9 @@ class KuduTest : public ::testing::Test {
   // (and the flags reset) before test_dir_ is deleted.
   std::unique_ptr<google::FlagSaver> flag_saver_;
 
+  // Sets the flags to enable encryption if 'enable_encryption' is true.
+  void SetEncryptionFlags(bool enable_encryption);
+
   std::string test_dir_;
 };